工作中遇到Kafka跨机房传输到远程机房的场景,之前的方案是使用Flume消费后转发到目标kafka,当topic增多并且数据量变大后,维护性较差且Flume较耗费资源。
一、原理
MirrorMaker 为Kafka 内置的跨集群/机房数据复制工具,二进制包解压后bin目录下有kafka-mirror-maker.sh,Mirror Maker启动后,包含了一组消费者,这些消费者属于同一个group,并从多个topic上读取数据,所有的topic均使用该group.id,每个MirrorMaker 进程仅有一个生产者,该生产者将数据发送给目标集群的多个topic;
Kafka MirrorMaker的官方文档一直没有更新,因此新版Kafka为MirrorMaker增加的一些参数、特性等在文档上往往找不到,需要看Kafka MirrorMaker的源码,Kafka MirrorMaker启动脚步如下,发现其主类位于kafka.tools.MirrorMaker,尤其是一些参数的解析逻辑和主要的执行流程,会比较有助于我们理解和运维Kafka MirrorMaker;
代码示例
| 1
 | exec $(dirname $0)/kafka-run-class.sh kafka.tools.MirrorMaker "$@"
 | 
  
MirrorMaker 为每个消费者分配一个线程,消费者从源集群的topic和分区上读取数据,然后通过公共生产者将数据发送到目标集群上,官方建议尽量让 MirrorMaker 运行在目标数据中心里,因为长距离的跨机房网络相对而言更加不可靠,如果发生了网络分区,数据中心之间断开了连接,无法连接到集群的消费者要比一个无法连接到集群的生产者要安全得多。
如果消费者无法连接到集群,最多也就是无法消费数据,数据仍然会在 Kafka 集群里保留很长的一段时间,不会有丢失的风险。相反,在发生网络分区时如果 MirrorMaker 已经读取了数据,但无法将数据生产到目标集群上,就会造成数据丢失。所以说远程读取比远程生成更加安全。

建议:
- 建议启动多个kafak-mirror-maker.sh 进程来完成数据同步,这样就算有进程挂掉,topic的同组消费者可以进行reblance;
- 建议将kafka-mirror-maker.sh进程启动在目标集群,原因上文有提及;
- kafak-mirror-maker.sh启动默认不会后台运行,调用kafka-run-class.sh的启动内存256M,需要修改一下启动参数(内存大小、日志);
- 建议对source 集群的whitelist中的topic的消费情况,加实时的积压量监控;
- 建议producer.properties配置中开启auto.create.topics.enable=true;
二、使用和配置
| 12
 3
 4
 
 | https://kafka.apache.org/downloads
 
 https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz
 
 | 
- 消费端配置(consumer.properties)
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 
 | zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
 group.id=groupyzg-02
 
 
 
 auto.offset.reset=largest
 
 
 
 partition.assignment.strategy=roundrobin
 
 | 
  
source kafka版本是1.0,配置bootstrap-server指定kafka集群地址,配置方式如下:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 
 | bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
 group.id=groupyzg-02
 
 
 
 
 
 auto.offset.reset=latest
 
 
 
 
 
 heartbeat.interval.ms=30000
 
 
 
 
 
 session.timeout.ms=100000
 
 
 
 
 
 partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
 
 
 
 
 
 max.poll.records=20000
 
 
 
 
 
 receive.buffer.bytes=4194304
 
 
 
 
 
 max.partition.fetch.bytes=10485760
 
 | 
  
- 生产者配置(producer.properties)
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 
 | bootstrap.servers = 192.168.xxx:9092,192.168.xxx:9092
 buffer.memory = 268435456
 
 batch.size = 104857
 
 acks=0
 
 linger.ms=10
 
 max.request.size = 10485760
 
 send.buffer.bytes = 10485760
 
 compression.type=snappy
 
 | 
  
启动命令kafka-mirror-maker.sh中添加端口约束和启动内存配置:
| 12
 3
 4
 5
 
 | export KAFKA_HEAP_OPTS="-Xmx4G -Xms4G"
 export JMX_PORT="8888"
 
 exec $(dirname $0)/kafka-run-class.sh kafka.tools.MirrorMaker"$@"
 
 | 
  
日志监控:若想输出日志数据,则使用一下命令启动,日志数据会保存在kafka/logs/_mirror_maker.out 中;
| 1
 | sh bin/kafka-run-class.sh -daemon -name mirror_maker -loggc kafka.tools.MirrorMaker --consumer.config config/consumer.properties --num.streams 2 --producer.config config/producer.properties  --whitelist='testnet'
 | 
  
0.10版本的积压量监控:
| 1
 | ./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker--zookeeper xxxx:21810,xxx:21810,xxx:21810--topic testnet -group testnet-group
 | 
  
1.0版本的积压量监控:
| 1
 | ./kafka-consumer-groups.sh --bootstrap-server xxx:9092--describe --group testnet-group
 | 
| 12
 3
 4
 5
 6
 7
 8
 
 | ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
 
 
 ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group xxx
 
 
 ./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list
 
 | 
进程数监控:建议增加mirror-maker的进程数监控,及时发现并启动挂点进程;
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 
 | #!/bin/bash
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 sj=`date "+%F %T"`
 
 
 
 last_sj=`date "+%F %T" -d '-5 min'`
 
 
 
 runlog=~/kafka_2.11-1.0.0/alarm/run.log
 
 
 
 noticetel="138XXXXXXXX"
 
 province=~/kafka_2.11-1.0.0/alarm/province.cfg
 
 tmplog=~/kafka_2.11-1.0.0/alarm/tmp.log
 
 
 
 
 
 smsnotice(){
 
 info=$@
 
 IFS=","
 
 for i in $noticetel;do
 
 curl -kd xx
 
 
 
 done
 
 }
 
 
 
 
 
 
 
 province_all=`cat ${province}|wc -l`
 
 mount=`ps -ef|grep -i mirror_maker-gc |wc -l`
 
 
 
 ps -ef|grep -i mirror_maker-gc >${tmplog}
 
 
 
 echo "the mount of mirror-maker is `expr $mount - 1`!"> $runlog
 
 echo "the mount of province config is $province_all ! ">> $runlog
 
 if [ `expr $mount - 1` -ge $province_all ] ;then
 
 echo "`hostname -i` ----${sj} ---- the mirrormaker is ok!" >> $runlog
 
 else
 
 message="`hostname -i` ----${sj} ----the mount mirror-maker processor `expr $mount - 1` is less than the mount of province_config $province_all, "
 
 echo ${message} >> $runlog
 
 while read line
 
 do
 
 province_name=`echo ${line}|awk -F '|' '{print $1}'`
 
 province_code=`echo ${line}|awk -F '|' '{print $2}'`
 
 mount_two=`cat ${tmplog}|grep -i ${province_code} |wc -l`
 
 
 
 if [ $mount_two -ge 1 ] ;then
 
 echo "`hostname -i` ----${sj} ---- the province of ${province_name} is ok!" >> $runlog
 
 else
 
 message_two="${message} the province of [ ${province_name} ] mirror-maker processor is down, please check for it!"
 
 echo ${message_two} >> $runlog
 
 smsnotice ${message_two}
 
 fi
 
 done<${province}
 
 fi
 
 |