Kafka使用学习笔记

kafka是由Linkedin开源的一个分布式流式系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,通常我们使用过程中以消息系统来使用,kafka有很多优势,比如高可用,高吞吐等,在这里简单记录一下对一些特性的理解和学习

高吞吐、低延迟

磁盘顺序写入

kafka的消息是不断追加到文件中的,这个特性使kafka可以充分利用磁盘的顺序读写性能,顺序读写不需要硬盘磁头的寻道时间,只需很少的扇区旋转时间,所以速度远快于随机读写
具体是kafka使用基于日志结构(log-structured)的数据格式,即每个分区日志只能在尾部追加写入(append),不允许随机跳到某个位置开始写入。

zero-copy

kafka中使用zero-copy的方式是使用DMA机制,即直接内存访问技术,对于DMA和MMAP的区别我们再次暂不讨论
传统模式下数据流:

  1. 读取内容,将内容加载到内核的状态下的Buffer中
  2. CPU控制将内核模式数据复制到用户模式下
  3. 将用户模式的数据,复制到内核模式的Socket Buffer中
  4. 将内核模式下的Socket Buffer的数据复制到网卡设备中传输

DMA实现zero-copy:

  1. 将文件内容从磁盘通过DMA引擎复制到内核缓冲区
  2. 将数据位置和长度信息的描述符复制到socket缓存区
  3. 然后直接将数据传输到网络接口,最后发送

总结:kafka是调用linux系统给出的sendfile系统调用来使用零拷贝。Java中对应FileChannel.transferTo接口。

Page Cache

Broker收到数据后,写磁盘时只是将数据写入Page Cache,并不保证数据一定完全写入磁盘。对于机器宕机时,Page Cache内的数据未写入磁盘从而造成数据丢失,Kafka通过Replication机制去解决。如果为了保证这种情况下数据不丢失而强制将Page Cache中的数据Flush到磁盘,反而会降低性能。也正因如此,Kafka虽然提供了flush.messages和flush.ms两个参数将Page Cache中的数据强制Flush到磁盘,但是Kafka并不建议使用。如果数据消费速度与生产速度相当,甚至不需要通过物理磁盘交换数据,而是直接通过Page Cache交换数据。同时,Follower从Leader Fetch数据时,也可通过Page Cache完成。

分区

在集群中broker可以水平扩展,一个topic对应多个parition,一个partion对应多个segment,每次操作针对一小段,同时也可以水平扩展,分布式和并行的方式提供了高吞吐能力

批量发送

kafka允许进行批量发送消息,producter发送消息的时候,可以将消息缓存在本地,等到了固定条件发送到kafka,等消息条数到固定条数,一段时间发送一次

数据压缩

Kafka还支持对消息集合进行压缩,Producer可以通过GZIP或Snappy格式对消息集合进行压缩,减少传输的数据量,减轻对网IO的压力,批量发送和数据压缩一起使用,单条做数据压缩的话,效果不明显

高可用

kafka的高可用主要有多副本机制保证,多副本机制是指一个parition拥有多个副本(replication),多个副本会分配到不同的broker上,kafka会为同一个topic的多个partition选出一个leader,之后所有该partition的请求,实际操作的都是leader,
然后再同步到其他的follower。当一个broker宕机之后,所有leader在该broker上的partition都会重新选举,选出一个leader,并将该broker节点踢出ISR(in-sync Replica)队列
多副本同步策略中,producer可以指定request.required.acks来决定数据一致性和低延时的取舍,acks有三个值0,1,2,

  • 0代表只发送,不关注broker是否处理成功
  • 1代表等待leader节点返回成功,其他replica节点通过fetcher同步,在这里发生leader切换的时候,可能会丢数据
  • -1代表所有的 ISR机器同步成功才返回,采用强一致性,可用性最高,但会带来延迟问题

kafka集群MHA

controller选主

controller是kafka集群中,在所有broker中选出来一个leader,选举过程是所有broker都去zk创建一个临时节点(/controller),只有一个会注册成功,即成为controller节点,竞选失败的broker会在改节点下注册watch,
controller节点用来管理kafka集群状态,parition的分配,parition的leader选举等,相较于之前每个parition都通过zk选主的方式,不会注册大量watch,不会产生惊群效应,对zk的压力不会太大,且不会脑裂

parition选主

如上所述,parition的选主有controller节点来控制,具体做法是zk中会维护一个parition的所有可用分区信息ISR(in-sync-replicas),然后controller根据配置的选举算法选出一个leader,controller与broker之间的通信直接通过RPC的方式
具体算法有:

1
2
3
4
5
 NoOpLeaderSelector
offlinePartitionLeader
reassignedPartitionLeader
preferredReplicaPartitionLeader
ControlledShutdownLeader

语义

Kafka实现了三种语义,下面我们将从生产者和消费者的角度分别分析对各个语义的支持以及实现

At most once

Producer

1
2
acks=0  retries=0
acks可选项:[0,1,all]。

0表示Producer发送完消息后不会等待任何Broker的确认
1表示Producer会等待Broker集群中的leader的确认写入消息
all表示Producer需要等待Broker集群中leader和其所有follower的确认写入消息

Consumer

1
2
enable.auto.commit=true
auto.commit.interval.ms 配置为一个很小的数值,定时提交任务

At least once

Producer

1
默认语义,具体配置为  Kafka中默认acks=1并且retries=2147483647。

Consumer

1
enable.auto.commit=false,手动调用consumer.commitSync()来提交offset

Exactly once

Producer

1
2
- enable.idempotence=true。enable.idempotence 
- acks=all

该配置项表示是否使用幂等性。当enable.idempotence配置为true时,acks必须配置为all。并且建议max.in.flight.requests.per.connection的值小于5。
acks=all 表示等leader和follower节点都写入消息才返回确认

首先kafka本身支持At least once消息送达语义,要在生产端支持有且仅有一次的语义,就需要发送消息的时候实现幂等,要实现消息发送的幂等关键是要实现Broker端消息的去重。具体是用PID和Sequence Numbler来实现

PID是指:每个新的Producer在初始化的时候会被分配一个唯一的PID,这个PID对用户是不可见的。
Sequence Numbler:对于每个PID,该Producer发送数据的每个<Topic, Partition>都对应一个从0开始单调递增的Sequence Number,Broker端在缓存中保存了这Sequence Numbler,对于接收的每条消息,如果其序号比Broker缓存中序号大于1则接受它(req seqnum = seqnum + 1),否则将其丢弃。这样就可以实现了消息重复提交了。

Consumer

事务,kafka consumer对于Exactly once语义的实现可以通过事务来做,即消费消息和提交消费者offset放在一个事务中,保证原子性,但是前提是配置事务隔离级别为

1
isolation.level=read_committed

read_committed:可以同时读取事务执行过程中的部分写入数据和已经完整提交的事务写入数据;
read_uncommitted:完全不等待事务提交,按照offsets order去读取消息,也就是兼容0.11.x版本前Kafka的语义(kafka事务特性在0.11版本之后引入);

一些常用命令

启动服务

1
. /kafka -server -start.sh.. / config / server.properties 1 > /dev/null 2>&1 &

创建topic

1
./kafka -topics.sh --create --zookeeper localhost: 2181 --config max.message.bytes = 12800000 --config flush.messages =1  --replication 1 -factor 1 --partitions 1 --topic test

查看topic详情

1
./kafka -topics.sh --describe--zookeeper 168.61.13.109:2181 --topic test

写入消息

1
./kafka -console -producer.sh --broker -list localhost: 9092 --topic test

消费消息

1
./kafka - console -consumer.sh --bootstrap -server localhost: 9092 --topic test --from -beginning

变更topic信息

1
./kafka -topics.sh --zookeeper 192.168.187.146:2181 --alter --topic test0 --partitions 3