如何将Flume与kafka进行整合
|Word count:552|Reading time:2min|Post View:
自从Flume1.6开始,新增了对Kafka的支持,极大地提升了Flume的采集能力。避免后端因热点问题导致kafka的channel爆满而无法采集数据。
本篇介绍使用Flume当前最新版本1.8与Kafka的结合使用。
基本环境
- Kafka (192.168.156.101:9092)
- Zookeeper(192.168.156.101:2181)
- JDK1.8
安装Flume
1 2
| wget http://apache-flume-1.8.0-bin.tar.gz tar -zxvf apache-flume-1.8.0-bin.tar.gz
|
进入apache-flume-1.8.0-bin目录,在conf路径中新增配置文件flume.properties
(名称随意)。
1 2
| cd apache-flume-1.8.0-bin touch conf/flume.properties
|
新增如下配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| ## 此处定义 agent 的source(数据源)、sink(数据流向)、channel(管道) agent1.sources=source1 agent1.sinks=sink1 agent1.channels=channel1
## 此处定义Agent 数据源的类型 agent1.sources.source1.type=http agent1.sources.source1.bind=0.0.0.0 agent1.sources.source1.port=9000 agent1.channels.channel1.type=memory agent1.channels.channel1.capacity=10000 agent1.channels.channel1.transactionCapacity=100 agent1.channels.channel1.keep-alive=30
## 此处定义kafka的sink topic broker agent1.sinks.sink1.type=org.apache.flume.sink.kafka.KafkaSink agent1.sinks.sink1.topic=kafkaTest agent1.sinks.sink1.kafka.bootstrap.servers = 192.168.156.101:9092 agent1.sinks.sink1.requiredAcks=1 agent1.sinks.sink1.kafka.producer.acks = 1 agent1.sinks.sink1.kafka.flumeBatchSize = 20 agent1.sinks.sink1.kafka.producer.linger.ms = 1 agent1.sinks.sink1.kafka.producer.compression.type = snappy ## 此处定义source的channel 和 sink的channel agent1.sources.source1.channels=channel1 agent1.sinks.sink1.channel=channel1
|
启动flume
在apache-flume-1.8.0-bin
中执行如下命令启动flume。
1
| nohup bin/flume-ng agent -f conf/flume.properties -n agent1 -c /home/cdhuser/apache-flume-1.8.0-bin/conf >/dev/null &
|
注意此处的-f
、-n
、 -c
参数:
- -f 表示配置文件的路径
- -n agent的名称,与配置文件中一直
- -c 配置文件所在的路径
此时,便已经成功启动了flume,source为HTTP,端口为9000,sink为Kafka,channel默认在内存,当然也可以将channel配置为Kafka Channel。
使用Rest Client
给9000端口发送数据,然后在kafka消费者端进行查看。
启动kafka消费端
1 2 3
| cd /opt/kafka
bin/kafka-console-consumer.sh --zookeeper 192.168.156.101:2181 --topic kafkaTest --from-beginning
|
然后发送如下测试数据
1 2 3 4 5 6 7 8 9
| [ { "headers" : { "datatype" : "test", "timestamp" : 1456989430522 }, "body" : "123123$45645$20160223111222$10.10.170.75$01$1$2$PC" } ]
|
此时在kafka消费者那一侧就可以发现如下信息:
1
| 123123$45645$20160223111222$10.10.170.75$01$1$2$PC
|
写的比较乱,有空在整理。