工作中遇到Kafka跨机房传输到远程机房的场景,之前的方案是使用Flume消费后转发到目标kafka,当topic增多并且数据量变大后,维护性较差且Flume较耗费资源。
一、原理
MirrorMaker 为Kafka 内置的跨集群/机房数据复制工具,二进制包解压后bin目录下有kafka-mirror-maker.sh,Mirror Maker启动后,包含了一组消费者,这些消费者属于同一个group,并从多个topic上读取数据,所有的topic均使用该group.id,每个MirrorMaker 进程仅有一个生产者,该生产者将数据发送给目标集群的多个topic;
Kafka MirrorMaker的官方文档一直没有更新,因此新版Kafka为MirrorMaker增加的一些参数、特性等在文档上往往找不到,需要看Kafka MirrorMaker的源码,Kafka MirrorMaker启动脚步如下,发现其主类位于kafka.tools.MirrorMaker,尤其是一些参数的解析逻辑和主要的执行流程,会比较有助于我们理解和运维Kafka MirrorMaker;
代码示例
1
| exec $(dirname $0)/kafka-run-class.sh kafka.tools.MirrorMaker "$@"
|
MirrorMaker 为每个消费者分配一个线程,消费者从源集群的topic和分区上读取数据,然后通过公共生产者将数据发送到目标集群上,官方建议尽量让 MirrorMaker 运行在目标数据中心里,因为长距离的跨机房网络相对而言更加不可靠,如果发生了网络分区,数据中心之间断开了连接,无法连接到集群的消费者要比一个无法连接到集群的生产者要安全得多。
如果消费者无法连接到集群,最多也就是无法消费数据,数据仍然会在 Kafka 集群里保留很长的一段时间,不会有丢失的风险。相反,在发生网络分区时如果 MirrorMaker 已经读取了数据,但无法将数据生产到目标集群上,就会造成数据丢失。所以说远程读取比远程生成更加安全。

建议:
- 建议启动多个kafak-mirror-maker.sh 进程来完成数据同步,这样就算有进程挂掉,topic的同组消费者可以进行reblance;
- 建议将kafka-mirror-maker.sh进程启动在目标集群,原因上文有提及;
- kafak-mirror-maker.sh启动默认不会后台运行,调用kafka-run-class.sh的启动内存256M,需要修改一下启动参数(内存大小、日志);
- 建议对source 集群的whitelist中的topic的消费情况,加实时的积压量监控;
- 建议producer.properties配置中开启auto.create.topics.enable=true;
二、使用和配置
1 2 3 4
| https://kafka.apache.org/downloads
https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz
|
- 消费端配置(consumer.properties)
1 2 3 4 5 6 7 8 9 10 11
| zookeeper.connect=zk1:2181,zk2:2181,zk3:2181 group.id=groupyzg-02
auto.offset.reset=largest
partition.assignment.strategy=roundrobin
|
source kafka版本是1.0,配置bootstrap-server指定kafka集群地址,配置方式如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092 group.id=groupyzg-02
auto.offset.reset=latest
heartbeat.interval.ms=30000
session.timeout.ms=100000
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
max.poll.records=20000
receive.buffer.bytes=4194304
max.partition.fetch.bytes=10485760
|
- 生产者配置(producer.properties)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| bootstrap.servers = 192.168.xxx:9092,192.168.xxx:9092 buffer.memory = 268435456 batch.size = 104857 acks=0 linger.ms=10 max.request.size = 10485760 send.buffer.bytes = 10485760 compression.type=snappy
|
启动命令kafka-mirror-maker.sh中添加端口约束和启动内存配置:
1 2 3 4 5
| export KAFKA_HEAP_OPTS="-Xmx4G -Xms4G" export JMX_PORT="8888" exec $(dirname $0)/kafka-run-class.sh kafka.tools.MirrorMaker"$@"
|
日志监控:若想输出日志数据,则使用一下命令启动,日志数据会保存在kafka/logs/_mirror_maker.out 中;
1
| sh bin/kafka-run-class.sh -daemon -name mirror_maker -loggc kafka.tools.MirrorMaker --consumer.config config/consumer.properties --num.streams 2 --producer.config config/producer.properties --whitelist='testnet'
|
0.10版本的积压量监控:
1
| ./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker--zookeeper xxxx:21810,xxx:21810,xxx:21810--topic testnet -group testnet-group
|
1.0版本的积压量监控:
1
| ./kafka-consumer-groups.sh --bootstrap-server xxx:9092--describe --group testnet-group
|
1 2 3 4 5 6 7 8
| ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group xxx
./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list
|
进程数监控:建议增加mirror-maker的进程数监控,及时发现并启动挂点进程;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
| #!/bin/bash
sj=`date "+%F %T"`
last_sj=`date "+%F %T" -d '-5 min'`
runlog=~/kafka_2.11-1.0.0/alarm/run.log
noticetel="138XXXXXXXX" province=~/kafka_2.11-1.0.0/alarm/province.cfg tmplog=~/kafka_2.11-1.0.0/alarm/tmp.log
smsnotice(){ info=$@ IFS="," for i in $noticetel;do curl -kd xx
done }
province_all=`cat ${province}|wc -l` mount=`ps -ef|grep -i mirror_maker-gc |wc -l` ps -ef|grep -i mirror_maker-gc >${tmplog} echo "the mount of mirror-maker is `expr $mount - 1`!"> $runlog echo "the mount of province config is $province_all ! ">> $runlog if [ `expr $mount - 1` -ge $province_all ] ;then echo "`hostname -i` ----${sj} ---- the mirrormaker is ok!" >> $runlog else message="`hostname -i` ----${sj} ----the mount mirror-maker processor `expr $mount - 1` is less than the mount of province_config $province_all, " echo ${message} >> $runlog while read line do province_name=`echo ${line}|awk -F '|' '{print $1}'` province_code=`echo ${line}|awk -F '|' '{print $2}'` mount_two=`cat ${tmplog}|grep -i ${province_code} |wc -l` if [ $mount_two -ge 1 ] ;then echo "`hostname -i` ----${sj} ---- the province of ${province_name} is ok!" >> $runlog else message_two="${message} the province of [ ${province_name} ] mirror-maker processor is down, please check for it!" echo ${message_two} >> $runlog smsnotice ${message_two} fi done<${province} fi
|