Kafka的学习和使用
本文是基于CentOS 7.3系统环境,进行Kafka的学习和使用
Kafka
0 Kafka基本概念
(1) 什么是Kafka
Kafka是一个分布式的基于发布/订阅模式的消息队列,主要应用于大数据实时处理领域>
(2) 消息队列
- 点对点模式的消息队列
对一个消息而言,只会有一个消费者可以消费,消费者主动拉取消息,消息收到后,会将消息删除 - 基于发布/订阅模式的消息队列
发布到topic的消息会被所有订阅者消;费,消费者消费完消息后不会删除 - 消息队列主动推送
适用于消费者处理能力高,生产者生产消息较少的场景,整个系统的消息处理速度由消息队列的推送速度决定,多个消费者之间处理消息的能力不同,如果消息队列按照一定的速度推送消息,可能会造成某些消费者处理消息处理不过来,某些消费者处理速度很快一直处于空闲 - 消费者主动拉取
适用于生产者生产消息的速度高于消费者,整个系统的消息处理速度由消费者的拉取速度决定,如果消息队列中消息较少,消费者会一直维护着一个长轮询去访问消息队列是否有消息
(3) Kafka的作用
- 解耦和异步
将强依赖的两个数据上下游系统,通过消息队列进行解耦,可以实现上下游的异步通信 - 削峰
缓解数据上下游两个系统的流速差
1 安装及常用命令
(1)安装
集群规划
hadoop11 | hadoop12 | hadoop13 |
---|
zk | zk | zk | kafka | kafka | kafka |
http://kafka.apache.org/downloads.html

安装步骤
0.准备工作
安装JDK,搭建zookeeper集群环境
1.解压kafka安装包
[root@hadoop11 modules]
2.修改解压后的文件名 (配置环境变量)
[root@hadoop11 installs]
3.在/opt/installs/kafka0.11目录下创建logs文件夹
[root@hadoop11 installs]
[root@hadoop11 kafka0.11]
4.修改配置文件
[root@hadoop11 kafka0.11]
[root@hadoop11 config]
broker.id=11
delete.topic.enable=true
log.dirs=/opt/installs/kafka0.11/logs
zookeeper.connect=hadoop11:2181,hadoop12:2181,hadoop13:2181
5.分发安装包
[root@hadoop11 installs]
[root@hadoop11 installs]
6.分别在hadoop12和hadoop13上修改配置文件/opt/installs/kafka/config/server.properties中的broker.id
broker.id=12
broker.id=13
注:broker.id不得重复
7.启动集群
在三台节点分别执行命令启动kafka
[root@hadoop11 kafka0.11]
[root@hadoop12 kafka0.11]
[root@hadoop13 kafka0.11]
8.验证集群是否启动成功
[root@hadoop11 kafka]
1571 Kafka
1622 Jps
1215 QuorumPeerMain
8.关闭集群
[root@hadoop11 kafka0.11]$ bin/kafka-server-stop.sh stop
[root@hadoop12 kafka0.11]$ bin/kafka-server-stop.sh stop
[root@hadoop13 kafka0.11]$ bin/kafka-server-stop.sh stop
kafka的伪分布式只需要解压启动 就好了 参考官方文档 超级简单
(2)常用命令
-
创建topic
注意:一般在系统设计的时候,先把topic规划好,业务含义相同的数据,放在一个topic中。
[root@hadoop11 kafka0.11]
--zookeeper 连接的zookeeper
--create 表示要创建一个topic
--topic 指定topic的名字(不同业务的数据,放在不同的topic中)
--partitions 指定分区数
--replication-factor 指定副本数
-
查看所有topic [root@hadoop11 kafka0.11]
-
查看某个topic详情 [root@hadoop11 kafka0.11]
-
删除topic [root@hadoop11 kafka0.11]
-
接收消息 [root@hadoop11 kafka0.11]
-
发送消息 [root@hadoop11 kafka0.11]
2 Kafka基础架构
- 为了方便横向扩展,并提高吞吐量,一个topic分为多个partition
- 配合分区的设计,提出消费者组的概念,组内每个消费者并行消费
- 为提高可用性,为每个partition增加若干副本,类似于NameNode HA

- Producer
消息生产者,连接broker-list,就是向kafka broker发消息的客户端 - Consumer
消息消费者,连接bootstrp-server,向kafka broker取消息的客户端 - Consumer Group (CG)
消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者 - Cluster
kafka集群,连接zookeeper,包含多个broker - Broker
一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic - Topic
可以理解为一个队列,生产者和消费者面向的都是一个topic - 分区为了并发,备份为了容灾
- Partition
为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列 - Replica
副本,为保证集群中的某个节点发生故障时, 该节点上的partition数据不丢失,且kafka仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader和若干个follower - leader
每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是leader - follower
每个分区多个副本中的“从”,实时与leader保持数据的同步。leader发生故障时,某个follower会成为新的follower - zookeeper
存储topic信息、分区信息、副本信息、ISR信息、消费者的偏移量(0.9版本之前,之后存在kafka集群中d __consumer_offset主题中,为了提高效率,减少与zookeeper的通信时间)
在一个kafka中创建一个topic,一个topic可以设置多个partition(理论无限),但分区副本个数不能超过broker的个数。
同一个consumer group中下不允许多个consumer消费同一个partition,当partition大于CG下consumer个数时,一个consumer可消费多个partition,小于时,则闲着。
字节面试题:
? 如何让所有的消费者消费同一个partition?
3 Kafka工作流程
- kafka中消息是以topic进行分类的,生产者生成消息,消费者消费消息,都是面向topic的
- topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一组log文件,该log文件中存储的就是producer生产的数据。
- Producer生产的数据会被不断追加到该log文件末端,且每条数据都有自己的offset。
- 消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费。

4 Kafka文件存储机制

-
由于生产者生产的消息会不断追加到log文件末尾,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个partition分为多个segment -
每个segment对应两个文件——“.index”文件和“.log”文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号。例如,first这个topic有三个分区,则其对应的文件夹为first-0,first-1,first-2 -
index和log文件以当前segment的上一个文件的最后一条信息的offset+1或者当前的segmen的第一条信息的offset命名 例如:什么意思呢?举例00000000000000000000.index 说明第一条消息offset是0,也就是最开始的位置;00000000000000170410.index 这里偏移量是170410,说明是170410这个位置开始。 -
index文件存储大量的索引信息,log文件存储大量的数据,索引文件中的元数据指向对应数据文件中message的物理偏移地址

5 Kafka生产者
5.1 分区策略
(1) 分区的原因
- 方便在集群中扩展
每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了 - 可以提高并发
因为可以以Partition为单位读写了
(2) 分区的原则
我们需要将producer发送的数据封装成一个ProducerRecord对象。
- 指明 partition 的情况下,直接将指明的值直接作为 partiton 值
- 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值
- 既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin 算法
- 自定义分区策略。
① 自定义类 implements Partitioner
② props.put("partitioner.class", "自定义类");
5.2 数据可靠性保证
为保证producer发送的数据,能可靠的发送到指定的topic,topic的每个partition收到producer发送的数据后,都需要向producer发送ack(acknowledgement确认收到),如果producer收到ack,就会进行下一轮的发送,否则重新发送数据。

(1) 副本数据同步策略
策略 | 优点 | 缺点 |
---|
半数以上完成同步,就发送ack | 延迟低 | 选举新的leader时,容忍n台节点的故障,需要2n+1个副本 | 全部完成同步,就发送ack | 选举新的leader时,容忍n台节点的故障,需要n+1个副本 | 延迟高 |
Kafka选择了第二种方案,原因如下:
- 同样为了容忍n台节点的故障,第一种方案需要2n+1个副本,而第二种方案只需要n+1个副本,而Kafka的每个分区都有大量的数据,第一种方案会造成大量数据的冗余。
- 虽然第二种方案的网络延迟会比较高,但网络延迟对Kafka的影响较小。
(2) ISR
AR(All Replica):分区所有副本
ISR(In-Sync Replica):同步副本,在规定时间内,能够实现数据同步。
OSR(Out-Sync Replica):同步副本,在规定时间内,不能够实现数据同步。从ISR中被踢出的。
? 采用第二种方案之后,设想以下情景:leader收到数据,所有follower都开始同步数据,但有一个follower,因为某种故障,迟迟不能与leader进行同步,那leader就要一直等下去,直到它完成同步,才能发送ack。这个问题怎么解决呢?
? Leader维护了一个动态的in-sync replica set (ISR),意为和leader保持同步的follower集合。当ISR中的follower完成数据的同步之后,leader就会给producer发送ack。如果follower长时间未向leader同步数据,则该follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms参数设定。Leader发生故障之后,就会从ISR中选举新的leader。
(3) ack应答机制
对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR中的follower全部接收成功。 所以Kafka为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置。
acks参数配置
- 0
producer不等待broker的ack,这一操作提供了一个最低的延迟,broker一接收到还没有写入磁盘就已经返回,当broker故障时有可能丢失数据; - 1
producer等待broker的ack,partition的leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据; - -1(all)
producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack。但是如果在follower同步完成后,broker发送ack之前,leader发生故障,那么会造成数据重复 。
(4)消费数据一致性保障
- LEO
指的是每个副本最大的offset - HW
指的是消费者能见到的最大的offset,ISR队列中最小的LEO 
(5)故障处理细节
- follower故障
follower发生故障后会被临时踢出ISR,待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步。等该follower的LEO大于等于该Partition的HW,即follower追上leader之后,就可以重新加入ISR了。 - leader故障
leader发生故障之后,会从ISR中选出一个新的leader,之后,为保证多个副本之间的数据一致性,其余的follower会先将各自的log文件高于HW的部分截掉,然后从新的leader同步数据。 注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。
5.3 Exactly Once语义
? 将服务器的ACK级别设置为-1,可以保证Producer到Server之间不会丢失数据,即At Least Once语义。相对的,将服务器ACK级别设置为0,可以保证生产者每条消息只会被发送一次,即At Most Once语义。
? At Least Once可以保证数据不丢失,但是不能保证数据不重复;相对的,At most Once可以保证数据不重复,但是不能保证数据不丢失。但是,对于一些非常重要的信息,比如说交易数据,下游数据消费者要求数据既不重复也不丢失,即Exactly Once语义。在0.11版本以前的Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。对于多个下游应用的情况,每个都需要单独做全局去重,这就对性能造成了很大影响。
? 0.11版本的Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指Producer不论向Server发送多少次重复数据,Server端都只会持久化一条。 幂等性结合At Least Once语义,就构成了Kafka的Exactly Once语义。即:
At Least Once + 幂等性 = Exactly Once
? 要启用幂等性,只需要将Producer的参数中enable.idempotence设置为true即可。Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的Producer在初始化的时候会被分配一个PID,发往同一Partition的消息会附带Sequence Number。而Broker端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker只会持久化一条。
但是PID重启就会变化,同时不同的Partition也具有不同主键,所以幂等性无法保证跨分区跨会话的Exactly Once。
6 Kafka消费者
6.1 消费方式
consumer采用pull(拉)模式从broker中读取数据。
push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。
pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回,这段时长即为timeout。
6.2 分区分配策略
一个consumer group中有多个consumer,一个 topic有多个partition,所以必然会涉及到partition的分配问题,即确定那个partition由哪个consumer来消费。
Kafka有两种分配策略,一是roundrobin,一是range。
1)range (区间)
默认

2)roundrobin (轮训)

6.3 offset的维护
? 由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。
Offset:kafka会保存每个topic数据消费的记录offset,以便记录consumer消费到哪个数据了

Kafka 0.9版本之前,consumer默认将offset保存在Zookeeper中,从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为**__consumer_offsets**。

7、Kafka Java API
1.Producer API
① 添加依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
② 相关API
KafkaProducer:需要创建一个生产者对象,用来发送数据
ProducerConfig:获取所需的一系列配置参数
ProducerRecord:每条数据都要封装成一个ProducerRecord对象
③ 异步发送 不带回调函数的Producer
public class CustomProducer {
public static void main(String[] args) throws Exception {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop11:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
KafkaProducer<String,String> producer = new KafkaProducer<>(configs);
for (int i=0;i<10;i++){
producer.send(new ProducerRecord<>("topica","hello"+i));
}
producer.close();
}
}
④ 异步发送 带回调函数的Producer
? 回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数,分别是RecordMetadata和Exception,如果Exception为null,说明消息发送成功,如果Exception不为null,说明消息发送失败。
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
public class CustomProducer_CallBack {
public static void main(String[] args) throws Exception {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop11:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
KafkaProducer<String,String> producer = new KafkaProducer<>(configs);
for (int i=0;i<10;i++){
producer.send(new ProducerRecord<>("topica", "hello" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception == null){
System.out.println("发送成功:"+metadata.partition());
System.out.println("发送成功:"+metadata.topic());
System.out.println("发送成功:"+metadata.offset());
}
}
});
}
producer.close();
}
}
2.Consumer API
① 相关API
KafkaConsumer:需要创建一个消费者对象,用来消费数据
ConsumerConfig:获取所需的一系列配置参数
ConsuemrRecord:每条数据都要封装成一个ConsumerRecord对象
② Consumer接收数据
public class CustomConsumer {
public static void main(String[] args) {
Map<String,Object> map = new HashMap<>();
map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop11:9092");
map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
map.put(ConsumerConfig.GROUP_ID_CONFIG,"g1");
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer(map);
kafkaConsumer.subscribe(Arrays.asList("topica"));
while (true){
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(100);
Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator();
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}
8、Zookeeper在Kafka中的使用
kafka对zookeeper是强依赖 ,最新版的kafka2.8版本,已经不需要依赖zk了
? Kafka集群中有一个broker会被选举为Controller(启动broker向zk注册,先到先得),负责管理集群broker的上下线,所有topic的分区副本分配和leader选举等工作。
? Controller的管理工作都是依赖于Zookeeper的。
zookeeper 存储了一些关于 consumer 和 broker 的信息,那么就从这两方面说明 zookeeper 的作用。
1. broker
zookeeper 记录了所有 broker 的存活状态,broker 会向 zookeeper 发送心跳请求来上报自己的状态。
zookeeper 维护了一个正在运行并且属于集群的 broker 列表。
kafka 集群中有多个 broker,其中有一个会被选举为控制器。
控制器负责管理整个集群所有分区和副本的状态,例如某个分区的 leader 故障了,控制器会选举新的 leader。
从多个 broker 中选出控制器,这个工作就是 zookeeper 负责的。
kafka 允许一些 client 有不同的生产和消费的限额。
这些限额配置信息是保存在 zookeeper 里面的。
所有 topic 的访问控制信息也是由 zookeeper 维护的。
ISR(in-sync replica) 是 partition 的一组同步集合,就是所有 follower 里面同步最积极的那部分。
一条消息只有被 ISR 中的成员都接收到,才被视为“已同步”状态。
只有处于 ISR 集合中的副本才有资格被选举为 leader。
zookeeper 记录着 ISR 的信息,而且是实时更新的,只要发现其中有成员不正常,马上移除。
zookeeper 保存了所有 node 和 topic 的注册信息,可以方便的找到每个 broker 持有哪些 topic。
node 和 topic 在 zookeeper 中是以临时节点的形式存在的,只要与 zookeeper 的 session 一关闭,他们的信息就没有了。
zookeeper 保存了 topic 相关配置,例如 topic 列表、每个 topic 的 partition 数量、副本的位置等等。
2. consumer
kafka 老版本中,consumer 的消费偏移量是默认存储在 zookeeper 中的。
新版本中,这个工作由 kafka 自己做了,kafka 专门做了一个 offset manager。
和 broker 一样,consumer 也需要注册。
consumer 会自动注册,注册的方式也是创建一个临时节点,consumer down 了之后就会自动销毁。
kafka 的每个 partition 只能被消费组中的一个 consumer 消费,kafka 必须知道所有 partition 与 consumer 的关系。
以下是简单版本:
- 注册所有的broker(kafka节点)
临时节点
- 从broker中选出一个承担KafkaController的职责
1:防止单点故障,也是临时节点
2:负责在需要的时候对ISR列表中选出leader
- 保存Topic的元数据信息(描述信息)
zookeeper保存topic的分区和leader信息,并协助KafkaController在需要的时候(启动或者broker故障)选出新的分区的leader
kafkaController会监听 zookeeper中的borkers下的ids节点的子节点变化
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3VxgFZEg-1627175150820)(kafka.assets/image-20200903225410142.png)]
9、Flume整合Kafka
< 均为伪分布式 >
(1)kafka当flume的sink
①. 在flume的job目录下新建kafka-sink.conf文件
#=============这个文件是配置kafk sink==============
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
#######################下面的三行是配置的kafka-sink######################
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers =hadoop10:9092
a1.sinks.k1.kafka.topic =topica
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
②. 启动kafka的consumer消费者
[root@hadoop11 kafka]
③.启动flume
[root@hadoop11 apache-flume-1.9.0-bin]
④.生产数据
[root@hadoop11 data]
⑤.查看kafka的consumer消费者的消费情况
[root@hadoop11 kafka]
hello
(2)kafka当flume的source
kafka-source的配置
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
#============在flume配置文件中配置kafka source=========
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = hadoop10:9092
a1.sources.r1.kafka.topics = topica
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
###########需要特别注意的,kafka-source默认的batchSize:1000,
#所以channel的transactionCapacity必须大于等于1000
#channel的capacity必须大于channel的transactionCapacity
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
执行命令,启动flume agent
bin/flume-ng agent --conf conf --name a1 --conf-file jobs/kafka-source.conf -Dflume.root.logger=INFO,console
10、kafka执行流程
1. producer发送数据流程
# 流程说明
1. Producer.send的主线程。
① 数据先经过拦截器。
② 然后进行网络传输前的序列化
③ 计算消息所属的分区
④ 按照将消息存入对应的本地分区。(本地缓存,为了批量发送)
2. sender线程
① 当某个分区内的消息数量达到一定值:`batch.size`之后,才会发送数据。(默认值: 16384 (16kb))
config.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
② 如果某个分区内消息数量未达到batch.size,sender等待linger.ms之后也会批量发送。(默认值:0ms)
config.put(ProducerConfig.LINGER_MS_CONFIG,200);
# 流程细节解释
1. 主线程一条条向本地分区中存入数据。
2. Sender线程批量将本地分区的数据,发送到kafka的topic中的对应分区。--提高效率
3. 拦截器:可以对producer发送的数据,做一些通用功能的处理。
4. 序列化:为了保证数据在网络中传输和kafka的broker之间同步,需要数据执行序列化。
5. Partitioner:数据在写入到本地的内存队列(缓冲区)之前,会先计算分区再存放数据。
# 图示

2. Consumer消费数据流程
offset相关
# offset

Consumer从kafka的磁盘中消费数据,所以不用担心数据丢失问题。
但是,Consumer作为一个消费者,是有可能出现宕机等问题的,也就意味着会出现重启后,继续消费的问题,那么就必须要消费者偏移量,消费到哪条数据了。
结论:offset是用来记录Consumer的消费位置的,由Consumer自己负责维护(提交),保存在kafka的broker的内置topic中
auto.offset.reset=latest
# 自动提交offset
1. 默认情况下Consumer的offset自动提交。
enable.auto.commit=true
auto.commit.interval.ms=5000
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
# 手动提交offset
通过代码的方式手动明确offset提交的方式。
`config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");`
提交方式 | 优点 | 缺点 |
---|
同步提交 | 有失败重试机制,可以确保每次offset提交成功。 | 会影响消费者的消费速度。 | 异步提交 | 异步提交offset,不会阻挡继续消费,消费速度快。 | 可能会导致最新的offset没有提交成功,重启consumer之后,消费已经消费过的数据 |
while (true){
ConsumerRecords<String, String> crs = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> cr : crs) {
System.out.println("cr = " + cr);
}
kafkaConsumer.commitAsync();
}
while (true){
ConsumerRecords<String, String> crs = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> cr : crs) {
System.out.println("cr = " + cr);
}
kafkaConsumer.commitSync();
}
3. 拦截器
对Producer发送到Kafka的数据,进行前置处理和后置处理。
接口:org.apache.kafka.clients.producer.ProducerInterceptor
方法说明:
public class TimeInterceptor implements ProducerInterceptor<String, String> {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return null;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
String topic = metadata.topic();
int partition = metadata.partition();
long offset = metadata.offset();
System.out.println(topic+" : "+partition+" : "+offset);
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
执行时机

编码
需求如下:
-
对Producer发出的消息,添加一个时间戳:124313782314+message -
对kafka接收到的消息反回ack的时候,统计个数,以统计生产者发送数据的成功数据和失败数,并且打印失败的消息的offset。
定义拦截器
public class TimeInterceptor implements ProducerInterceptor<String, String> {
private Long successNum = 0L;
private Long errorNum = 0L;
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
ProducerRecord<String, String> records = new ProducerRecord<String, String>(record.topic(),record.partition(),record.timestamp(),record.key(),System.currentTimeMillis()+record.value(),record.headers());
return records;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
String topic = metadata.topic();
int partition = metadata.partition();
long offset = metadata.offset();
System.out.println(topic+" : "+partition+" : "+offset);
if (exception == null){
successNum++;
}else{
errorNum++;
}
}
@Override
public void close() {
System.out.println("successNum = " + successNum);
System.out.println("errorNum = " + errorNum);
System.out.println("-----------close---------");
}
@Override
public void configure(Map<String, ?> configs) {
System.out.println("拦截器初始化:configs = " + configs);
}
}
使用拦截器
config.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "demo3.TimeInterceptor");
|