`

flume 几个比较有用的source、sink和decorator

 
阅读更多

Source catalog:

1.Console 控制台输出,可以带输出内容格式的参数

比如console(“raw”), console(“json”)

2.text("filename"[, format])

一次读取文件内容,每一个换行做为一个事件。

3.tail("filename"[,startFromEnd=false]{,delim="regex", delimMode="exclude|prev|next"})

读取文件尾部内容跟linuxtail类似。

Filename:读取的文件名,

StartFromEnd 每次重读时,是否重头开始读。默认false每次重头开始重读,

Delim 是分割参数,

DelimMode分隔符数据属于哪一部分的指定。

4.tailDir("dirname"[,fileregex=".*"[, startFromEnd=false[,recurseDepth=0]]]{,delim="regex",delimMode="exclude|prev|next"})

对一个目录下面有改动文件的事件发送

Dirname:目录名

Fileregex:文件名正则表达式匹配,需要符合java中正则表达式规则

StartFromeEnd tail中的参数一样。

RecurseDepth :指定目录下是否递归对其子目录中文件的监控。指定递归层数。

其他剩余参数跟tail中一致。

Sinks catalog

CollectorTier Event Sinks

1. collectorSink("fsdir","fsfileprefix"[, rollmillis[, format]])

收集发送过来的事件。

Fsdir:目录,

Fsfileprefix:文件前缀,

Rollmillis:对于hdfs来就是文件的打开是关闭这段时间

Format是输出文件格式

AgentTier Event Sinks

1agentSink[("machine"[,port])]

Defaults to agentE2ESink

2agentE2ESink[("machine"[,port])]

先讲event内容写入文件防止在缓冲丢失和点对点确认的数据传输

MachinecollectorSourceip

Port:collectorSource的端口

3agentDFOSink[("machine"[,port])]

当连接断开时,会把event写到本地,并且会不断重试发送event给对方,把之前未发送成功的event再次发送。

4agentBESink[("machine"[,port])]

BestEffort Agent sink,最有效率的agent sink,顾名思义,就是只管发送,不管有没有发送成功。就可能存在丢失event的情况存在。

1、 agentE2EChain("m1[:_p1_]"[, "m2[:_p2_]"[,…]])

有多个collector可供选择发送。如果第一丢失连接了,就会向第二发送event,以此类推。并且会不定期回来查看原来没反应的collector是否已经恢复了,如果恢复了,就会跟原来的节点交互了。发送机制跟agentE2ESink一样

2、 agentDFOChain("m1[:_p1_]"[, "m2[:_p2_]"[,…]])

3、 agentBEChain("m1[:_p1_]"[, "m2[:_p2_]"[,…]])

上面两个跟6类似

autoE2EChainautoDFOChainautoBEChain,自动的会想master要可以连接的collector

装饰器catalog

1、writeAhead sink发送前先写入文件后,在本地缓冲机制,可让接收和发送分别在不同的线程中进行。

2、ackedWriteAhead[(maxmillis)]

3、diskFailover[(maxmillis)]

如果event发送失败,就在本地磁盘缓存,然后每隔maxmillis时间去重试。

4、ackInjector注入ack确认操作。

5、ackChecker:计算发送组的checksum值,插入到ackInjector

6、lazyOpen:只在条用append的时候,才会做真正的openclosed操作

7、 format("pattern"):改变输出格式,具体格式有pattern参数指定。对于需要做checksumevents来说由于改变了数据格式将导致checksum的值也不会改变。从而导致消息的丢失。这个装饰器一般用在对消息可靠性要求不是很高,或者reports中。

8、 batch(n,maxlatency)缓存nevents然后统一发送。如果等待时间操作maxlatency,即使未有nevents也会发送。

9、 unbatch:对于上面被batch操作过的events,同步unbatch来拆分开。

10、 gzip:将events事件序列化后压缩

11、 gunzip:对gzip压缩过的events,解压

12、 intervalSampler(n)每隔nevents发送一次,算是一个优化手段,特别是通过网络传输时。

13、 probSampler(p):通过概率p来做决定是否将events接着往下发送

14、 reservoirSampler(k):对接收到events根据发送时的顺序重新来组织。

15、 delay(ms)对要发送的events内容延迟ms

16、 choke[(choke-id)]:限制发送速度

分享到:
评论

相关推荐

    flume sink直写mysql

    flume 自定义sink组件 实现sink直接写入mysql数据库

    flume自定义source,sink,intercepor的demo,已经配置好maven插件

    一个简单的工程,已经设置各种配置,直接只用maven打包好就可以在flume工程包新建plugins.d/custom/lib 目录,并拷贝到下面,并将工程里面的conf文件拷贝到flume的conf目录下启动命令 nohup flume-ng agent -n ...

    flume-ng-hdfs-sink-1.7.0.jar

    flume-ng-hdfs-sink-1.7.0.jar,这个包里包含了flume和HDFS集成的所有类

    flume定制化sink

    flume定制化sink,用于参考,使用了多线程及读取配置文件的技术

    flume-ftp-source 相关jar包

    由于flume官方并未提供ftp,source的支持; 因此想使用ftp文件服务器的资源作为数据的来源就需要自定义ftpsource,根据github:https://github.com/keedio/flume-ftp-source,提示下载相关jar,再此作为记录。

    flume-ng-hbase-sink-1.7.0.jar

    Flume和Hbase集成的sink包,修改这个包里的源码可以成功客制化Flume往Hbase中写数据的格式。

    Flume配置双HA hdfsSink.docx

    通过修改flume源码实现flume向两个HA hadoop集群分发数据。

    flume-ng-elasticsearch6-sink.zip

    flume1.9采集数据入存入elasticsearch6.2.4,flume1.9本身只支持低版本的elasticsearch,基于apache-flume-1.9.0-src的flume-ng-sinks/flume-ng-elasticsearch-sink源码修改,支持es6.2.4,打的包,直接替换flume/...

    Flume的channel和sink.docx

    本文讲述了flume中channel和sink简单描述和linux配置 包括:Memory channel、File channel及其它测试阶段的Channel; 及channel通过sink的输出配置Logger Sink、File Roll Sink、HDFS Sink、Avro Sink(多级流动、...

    flume-taildir-source-1.9.0.jar

    flume断点续传覆盖jar,使用组件flume-taildir-source-1.9.0覆盖flume/bin目录下的jar即可

    flume-ng-sql-source-1.5.2.jar

    flume-ng-sql-source-1.5.2.jar从数据库中增量读取数据到hdfs中的jar包

    flume抽取数据库数据的source

    flume抽取数据库数据的源码,可以自动检测数据库的sql语句是否更新

    flume-ng-sql-source-1.5.1

    flume-ng-sql-source-1.5.1 flume连接数据库 很好用的工具

    flume-ng-kafka-sink:将数据发布到 Apache Kafka 的 Apache Flume Sink 实现

    这是一个 Sink 实现,可以将数据发布到主题。 目标是将 Flume 与 Kafka 集成,以便基于拉式的处理系统(如可以处理来自各种 Flume 源(如 Syslog)的数据。 这现在是官方 Flume 发行版(从 v1.6 开始)的一部分,并...

    flume-ng-kafka-sink

    水槽-ng-kafka-sink 注意flume-ng-kafka-sink 已合并到即将发布的flume 1.6 中。 这个 repo不会在维护中。 该项目用于 与进行通信。Kafka Sink的配置 agent_log.sinks.kafka.type = ...

    flume配置双HA hdfsSink源码修改实现

    通过修改flume源码实现flume向两个HA hadoop集群分发数据。

    flume-hive-sink-1.8.0.jar

    flume1.8.0 采集日志入库到hive,后台日志报错,是心跳bug,此包为修复jar包

    Flume配置文件kafkaSource

    Flume配置文件kafkaSource 包含Intercepter,包含正则表达式。

    flume-ng-sql-source-1.5.3.jar

    flume-ng-sql-source-1.5.3.jar,flume采集mysql数据jar包,将此文件拖入FLUME_HOME/lib目录下,如果是CM下CDH版本的flume,则放到/opt/cloudera/parcels/CDH-xxxx/lib/flume-ng/lib下,同样需要的包还有mysql-...

Global site tag (gtag.js) - Google Analytics