`

flume伪分布模式实践

 
阅读更多

1)启动flume master:bin/flume master ,这时你可以到master的web界面http://localhost:35871/查看flume集群信息,web界面如下:

从截图中可以看出,当前flume集群中没有node节点。

2)启动flume node节点:flume node_nowatch,你可以到http://localhost:35862/查看该节点的信息。


附注:启动flume节点有两种方式:a)flume node:以守护进程的方式运行node,无法通过console和用户进行交互;b)flume node_nowatch:使用户能够通过console和flume node进程进行交互

3)在成功启动flume node节点之后,刷新flume master的web界面:

可以看到flume 集群中已经新添加了一个节点。

4)通过master的web界面配置node节点:

从configure node中选择所要配置的节点,然后配置节点的source和sink,点击提交查询:

提交成功后,返回到主页,可以看到相关栏目已经更新:

5)测试配置是否成功,到运行flume node_nowatch的窗口输出,配置成功:

  1. thisistest
  2. hadoop07[INFOMonOct3107:34:21CST2011]thisistest


6)修改配置,将hadoop07的source修改为:tail("/etc/services"),提交查询,等到十几秒之后运行flume node_nowatch的窗口输出了/etc/services当前所有的内容。再次修改source:tail("/var/log/messages"),结果node节点报错,原因是运行flume的用户无法读取文件,将文件权限改为777后正常。将sink修改为:dfs("file:///tmp/var_log_messages.copy"),dfs sink的输出为序列文件;将sink修改为:text("/tmp/var_log_messages.copy"),flume节点出现错误信息:

  1. 2011-10-3107:55:16,721[Checkconfig]ERRORagent.LogicalNode:Forcingdrivertoexituncleanly
  2. 2011-10-3107:55:16,723[logicalNodehadoop07-30]WARNtext.TailSource:nextunexpectedlyinterrupted:null
  3. java.lang.InterruptedException
  4. atjava.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:877)
  5. atcom.cloudera.flume.handlers.text.TailSource.next(TailSource.java:271)
  6. atcom.cloudera.flume.core.connector.DirectDriver$PumperThread.run(DirectDriver.java:105)
  7. 2011-10-3107:55:16,723[logicalNodehadoop07-30]ERRORconnector.DirectDriver:Closingdownduetoexceptionduringappendcalls
  8. java.lang.InterruptedException
  9. atjava.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:877)
  10. atcom.cloudera.flume.handlers.text.TailSource.next(TailSource.java:271)
  11. atcom.cloudera.flume.core.connector.DirectDriver$PumperThread.run(DirectDriver.java:105)
  12. 2011-10-3107:55:16,723[logicalNodehadoop07-30]INFOconnector.DirectDriver:ConnectorlogicalNodehadoop07-30exitedwitherror:null
  13. java.lang.InterruptedException
  14. atjava.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:877)
  15. atcom.cloudera.flume.handlers.text.TailSource.next(TailSource.java:271)
  16. atcom.cloudera.flume.core.connector.DirectDriver$PumperThread.run(DirectDriver.java:105)
  17. 2011-10-3107:55:16,725[Checkconfig]INFOagent.LogicalNode:Nodeconfigsuccessfullysettocom.cloudera.flume.conf.FlumeConfigData@1a5af9f
  18. 2011-10-3107:55:16,825[TailThread-3]ERRORtext.TailSource:Tailthreadnterrupted:sleepinterrupted
  19. java.lang.InterruptedException:sleepinterrupted
  20. atjava.lang.Thread.sleep(NativeMethod)
  21. atcom.cloudera.util.Clock$DefaultClock.doSleep(Clock.java:62)
  22. atcom.cloudera.util.Clock.sleep(Clock.java:88)
  23. atcom.cloudera.flume.handlers.text.TailSource$TailThread.run(TailSource.java:197)
  24. 2011-10-3107:55:16,826[TailThread-3]INFOtext.TailSource:TailThreadhasexited
  25. 2011-10-3107:55:16,826[logicalNodehadoop07-30]INFOhdfs.DFSEventSink:Closingfile:///tmp/var_log_messages.copy
  26. 2011-10-3107:55:16,828[logicalNodehadoop07-30]ERRORconnector.DirectDriver:ExitingdriverlogicalNodehadoop07-30inerrorstateTailSource|DFSEventSinkbecausenull
但是数据已经正确的写入到文件:/tmp/var_log_messages.copy,但文件格式不是原始的文件格式。将sink修改为:text( "/tmp/var_log_messages1.copy", "raw" ),警告:
  1. conf.FlumeBuilder:Deprecatedsyntax:Expectedaformatspecbutinsteadhada(String)raw
查看数据文件:/tmp/var_log_messages1.copy,文件已经和原始格式一致。


7)启动collector,注意:flume node_nowatch 会绑定35862端口,flume node_nowatch -n collector会绑定35863端口,所以查看节点信息时的端口依据于最先启动的程序。

flume node_nowatch -n collector
8)使用Configure multiple nodes进行配置,配置代码如下:
  1. hadoop07:console|agentSink("localhost",35853);
  2. collector:collectorSource(35853)|console;

配置失败,错误信息如下:

  1. com.cloudera.flume.conf.FlumeSpecException:Parsererror:unexpected'collector'atposition0line2:'hadoop07:console|agentSink("localhost",35853);collector:collectorSource(35853)|console;'
使用Configure a single node进行配置,最终配置信息如下:


测试配置,成功。

9)最终的master web界面:




分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics