Kafka

笔记内容取自尚硅谷Kafka3.0教程,以及《深入理解Kafka核心设计与实践原理》

内容还会不断充实~

概述

定义

传统定义:

Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域

最新定义:

Kafka是一个开源的分布式事件流平台(Event Streaming Platform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。

消息队列

应用场景

  • 异步处理
  • 削峰

优势

  • 解耦

    允许你独立的扩展或修改两边的处理过程,只要确保他们遵守同样的接口约束

  • 可恢复性

    系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理

  • 缓冲

    有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的的处理速度不一致的情况

  • 灵活性&峰值处理能力

    • 灵活性:动态上下线,在流量高峰时可以动态增加服务器
    • 峰值处理:削峰,延迟处理

    使系统能够应对突发的高峰流量

  • 异步通信

    允许用户把一个消息放入队列,但不立即处理它,在需要的时候再去处理

模式

  1. 点对点模式

    一对一,消费者主动拉取数据,收到消息后,队列中的消息清除。队列可以有多个消费者,但对于一个消息而言,只能被一个消费者消费。
    在这里插入图片描述

  2. 发布/订阅模式

    • 一对多,消费者消费消息后,队列不会清除消息。消息生产者将消息发布到topic中,同时有多个消费者消费该消息。和点对点模式不同,发布到topic的消息会被所有订阅者消费

    • 发布/订阅模式中,又分为两种:

      • 消费者主动拉取消息

        Kafka就是属于这种类型

        优势:速度取决于消费者,可以根据消费能力以适当的速率消费消息

        弊端:需要轮询,查看队列中是否有消息,浪费资源

      • 队列推送消息

        类似于公众号推送

        弊端:

        推送消息的速度取决于队列,各个消费者处理消息的速度可能不一致,造成消费者崩掉(推送速度 >消费者处理速度)或者资源浪费(推送速度 < 消费者处理速度)

Kafka基础架构

架构图

在这里插入图片描述

zk在这里的作用:

  • 存储kafka集群信息
  • 存储消费者消费到的位置信息
    • 即:消费到第几条了
    • 消费者本地内存也会存储该条数信息,平时就是读取并维护本地的信息;但当机器挂掉重启后,会先去zk获取该信息,然后再在本地内存继续维护)
    • 0.9之后将位置信息存储到kafka里一个系统创建的topic中
      • 为何改存到kafka?
      • 因为消费者本身就需要维护与kafka的连接,去获取消息,如果将位置信息放在zk,则还需要请求zk获取信息,速度不如kafka(注:Kafka消息存到磁盘,默认存七天

名词解释

  • Broker:可以理解为起了Kafka进程的服务器
  • Topic:主题,可以理解为一个队列,用来将消息分类,便于发送和消费
  • Partition:分区,用来提高Kafka集群的并发处理能力,每一个partition是一个有序的队列(个人理解Partition就是对Topic的又一次细分,分布式的多个Broker是为了避免单个机器的性能造成的阻塞,多个partition是为了避免同一内存区域的IO阻塞)
  • Replication:副本。为保证集群中某个节点发生故障时,该节点上的partition数据不丢失,且kafka仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader和若干个follower
  • Leader:每个分区多个副本的"主",生产者发送数据的对象,消费者消费数据的对象都是leader
  • Follower:
    • Leader的副本,每个分区多个副本的"从",实时从leader同步数据,保持和leader数据的同步。用来备份数据(不直接对生产者和消费者提供读写),避免Leader所在机器挂掉后数据丢失。(所以,同一topic同一partition的Follower和Leader一定不在同一台机器)
    • leader发生故障时,某个Follower会成为新的leader
  • Consumer Group:消费者组,可以理解为一个大的消费者,目的是提高消费能力
    • 和其他普通消费者一样,需要订阅一个topic
    • 所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者
    • 一个消费者组里面的不同消费者,只能消费kafka中这个topic的不同partition的数据。(个人理解:建立partition就是为了提高消息的读写速度,对于同一个topic,消息的写入根据partition区分开了,消费者消费的时候如果不分开,会降低消息消费速度且造成重复消费,那partition的意义就不大了)

基本操作

安装

命令行操作

topic

查看该机器上所有topic:

kafka-topics.sh --list --zookeeper ip:zk端口

创建topic:

kafka-topics.sh --create --topic topic名称 --zookeeper ip:zk端口 --partitions 分区数 --replication-factor 副本数

#注:副本数不能大于当前可用的Broker数,分区数可以大于当前可用的Broker数
#副本数 包括 leader 和 follower

删除topic:

kafka-topics.sh --delete --topic first --zookeeper ip:zk端口

#注:执行效果:
#Topic first is marked for deletion.   标记为删除
#Note: This will have no impact if delete.topic.enable is not set to true.  只有当delete.topic.enable设为true时才会真正删除

查看topic详情:

kafka-topics.sh --describe --zookeeper ip:zk端口 --topic topic名称

消息

生产者发送消息

kafka-console-producer.sh --topic first --broker-list kafkaIP:kafka端口

消费者消费消息

kafka-console-consumer.sh --topic first --bootstrap-server kafkaIP:kafka端口 #从当前开始消费
#或者:
kafka-console-consumer.sh --topic first --bootstrap-server kafkaIP:kafka端口 --from-begining #从头开始消费

架构深入

工作流程

在这里插入图片描述

说明:

  • 偏移量不是全局唯一的,是分区唯一的
  • kafka只保证消息的分区内有序,不保证全局有序
    • 有序是指消费消息的顺序和生产消息的顺序一致
  • kafka中消息是以topic进行分类的,生产者生产消息,消费者消费消息,都是面向topic的
  • topic是逻辑上的概念,而partition是物理上的概念
    • 每个partition对应一个文件夹(文件夹名 = topic - partition),文件夹内有.log文件,该log文件存储的就是producer生产的数据。producer生产的数据会不断追加到该log文件末端,且每条数据都有自己的offset。

    • 消费者都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费。

    • 个人更倾向于《深入理解Kafka核心设计与实践原理》上的叙述:

      分区同主题一样是一个逻辑的概念而没有物理上的存在

      主题、分区、副本和Log(日志)的关系如下图,主题和分区都是提供给上层用户的抽象,而在副本层面或更加确切的说是Log层面才有实际物理上的存在
      在这里插入图片描述

生产者

消息发送流程

在这里插入图片描述

整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender线程(发送线程)。在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。

RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送,进而减少网络传输的资源消耗以提升性能。 RecordAccumulator的大小可通过生产者客户端参数buffer.memory配置,默认32MB。如果生产者发送消息的速度超过发送到服务器端的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数默认60秒。

主线程中发送过来的消息都会被追加到RecordAccumulator的某个双端队列(Deque)中,在RecordAccumulator的内部为每个分区都维护了一个双端队列,队列中的内容就是ProducerBatch,即Deque<ProducerBatch>。消息写入缓存时,追加到双端队列的尾部;Sender读取消息时,从双端队列的头部读取。

注意ProducerBatch不是ProducerRecord,ProducerBatch中可以包含一致多个ProducerRecord

  • ProducerBatch是一个消息批次,包含ProducerRecord,使字节使用更加紧凑
  • 将较小的ProducerRecord拼凑成一个较大的ProducerBatch,可以减少网络请求的次数以提升整体的吞吐量

如果生产者要向很多分区发送消息,则可将buffer.memory参数适当调大以增加整体吞吐量(buffer.memory大,RecordAccumulator则大,因RecordAccumulator中为每个分区都维护了一个双端队列,所以,RecordAccumulator大,每个分区分到的空间就大,可缓存的消息就多)。

在Kafka生产者客户端中,使用java.io.ByteBuffer实现消息内存的创建和释放,不过频繁的创建和释放比较耗费资源,故RecordAccumulator内部有一个BufferPool,主要用来实现ByteBuffer的复用,以实现缓存的高效利用。不过BufferPool只针对特定大小的ByteBuffer进行管理,而其他大小的ByteBuffer不会缓存进BufferPool中,这个特定的大小由batch.size参数指定,默认16KB。

ProducerBatch的大小和batch.size的关系:

当一条消息(ProducerRecord)进入RecordAccumulator时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列尾部获取一个ProducerBatch(如果没有则新建),查看ProducerBatch中是否还可以写入这个ProducerRecord,如果可以则写入,如果不可以则需要创建一个新的ProducerBatch。在新建ProducerBatch时评估这条消息的大小是否超过batch.size参数设定的大小,如果不超过,则以batch.size参数的大小来创建ProducerBatch,这样在使用完这段内存区域之后,可以通过BufferPool的管理来进行复用;如果超过,那么就以评估的大小来创建ProducerBatch,这段内存区域不会被复用。

向RecordAccumulator中追加消息源码:

public RecordAppendResult append(TopicPartition tp,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Header[] headers,
                                     Callback callback,
                                     long maxTimeToBlock,
                                     boolean abortOnNewBatch,
                                     long nowMs) throws InterruptedException {
        // We keep track of the number of appending thread to make sure we do not miss batches in
        // abortIncompleteBatches().
        appendsInProgress.incrementAndGet();
        ByteBuffer buffer = null;
        if (headers == null) headers = Record.EMPTY_HEADERS;
        try {
            // check if we have an in-progress batch
            // 获取 or 创建队列
            Deque<ProducerBatch> dq = getOrCreateDeque(tp);
            synchronized (dq) {
                if (closed)
                    throw new KafkaException("Producer closed while send in progress");
                // 尝试向ProducerBatch中添加数据
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
                if (appendResult != null)
                    // 向已有的 ProducerBatch 中追加成功,直接返回即可
                    return appendResult;
            }

            // we don't have an in-progress record batch try to allocate a new batch
            if (abortOnNewBatch) {
                // Return a result that will cause another call to append.
                return new RecordAppendResult(null, false, false, true);
            }

            byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
            // 计算批次大小设置值,和真实消息(序列化和压缩后)大小 取较大值
            int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
            log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), maxTimeToBlock);
            // BufferPool分配内存(按照上一步计算得到的size来分配,如果size超过了16k,则这块区域不会被BufferPool复用)(BufferPool只对特定大小的ByteBuffer进行管理,这个特定大小由batch.size指定,默认16k)
            buffer = free.allocate(size, maxTimeToBlock);

            // Update the current time in case the buffer allocation blocked above.
            nowMs = time.milliseconds();
            synchronized (dq) {
                // Need to check if producer is closed again after grabbing the dequeue lock.
                if (closed)
                    throw new KafkaException("Producer closed while send in progress");

                //再向 ProducerBatch 中追加试试
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
                if (appendResult != null) {
                    // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                    return appendResult;
                }

                //不再舔了,我自己新建一个ProducerBatch
                // 封装ByteBuffer
                MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
                // 再封装
                ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);
                FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
                        callback, nowMs));

                // 将ProducerBatch添加到队列末尾
                dq.addLast(batch);
                incomplete.add(batch);

                // Don't deallocate this buffer in the finally block as it's being used in the record batch
                buffer = null;
                return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);
            }
        } finally {
            if (buffer != null)
                free.deallocate(buffer);
            appendsInProgress.decrementAndGet();
        }
    }

Sender从RecordAccumulator中获取缓存的消息之后,会进一步将原本<分区,Deque<ProducerBatch>>的保存形式转变成<Node, List<ProducerBatch>>的形式,其中Node表示Kafka集群的broker节点。对于网络连接来说,生产者客户端是与具体的broker节点建立的连接,也就是向具体的broker节点发送消息,而并不关心消息是属于哪一个分区;而对于KafkaProducer的应用逻辑而言,我们只关注向哪个topic中发送哪些消息,所以这里需要做一个应用逻辑层到网络I/O层面的转换。

在转换成<Node, List<ProducerBatch>>的形式之后,Sender还会进一步封装成<Node, Request>的形式,这样就可以将Request请求发往各个Node了。这里的Request是指Kafka的各种协议请求,对于消息发送而言就是具体的ProduceRequest。

请求在从Sender线程发往Kafka之前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式为Map<NodeId, Deque<Request>>,它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId是一个String类型,表示节点的id编号)。通过max.in.flight.requests.per.connection参数可限制每个连接(也就是客户端与每个Node之间的连接)最多缓存的请求数,默认值为5,即每个连接最多只能缓存5个未响应的请求,超过该数值之后就不能再向这个连接发送更多请求了,除非有缓存的请求已经收到了响应(Response)。

如果响应成功,则会清理InFlightRequests中的请求,以及RecordAAccumulater中对应分区中的数据;

如果响应失败,则会进行重试,重试次数可通过retries参数进行设置,默认为int类型的最大值。

我们发送消息通常只指定了topic,那么生产者客户端如何知道要发往哪个broker节点呢?这就需要元数据

元数据是指kafka集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的leader副本分配在哪个节点上,follower副本分配在哪些节点上,哪些副本在AR,ISR等集合中,集群中有哪些节点,控制器节点又是哪一个等信息。

元数据的更新(二者满足其一即可触发更新):

  • 当客户端中没有要使用的元数据时
  • 超过metedata.max.age.ms时间没有更新元数据(默认5分钟)

当需要更新元数据时,会先挑选出latestLoadedNode(即InFlightRequests中还未确认的请求个数最小的Node),然后向这个Node发送MeteDataRequest请求来获取具体的元数据信息。这个更新操作由Sender线程发起,在创建完MeteDataRequest之后同样会存入InFlightRequests,之后的步骤就和发送消息时类似。元数据由Sender线程负责更新,但是主线程也需要读取这些信息,这里的数据同步由synchronized和final关键字来保障。

重要参数

参数名称说明
bootstrap.servers生 产 者 连 接 集 群 所 需 的 broker 地 址 清 单 。 例如ip:port,ip1:port,可以设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者可以从给定的 broker里查找到其他 broker 信息。
key.serializer 和 value.serializer指定发送消息的 key 和 value 的序列化类型。一定要写全类名
buffer.memoryRecordAccumulator 缓冲区总大小, 默认 32m。
batch.size缓冲区一批数据最大值, 默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加
linger.ms如果数据迟迟未达到 batch.size, sender 等待 linger.time之后就会发送数据。单位 ms, 默认值是 0ms,表示没有延迟。 生产环境建议该值大小为 5-100ms 之间。
acks0:生产者发送过来的数据,不需要等数据落盘应答。
1:生产者发送过来的数据, Leader 收到数据后应答。
-1(all):生产者发送过来的数据, Leader+和 isr 队列里面的所有节点收齐数据后应答。-1 和all 是等价的。 Kafka3.0中默认值是-1,之前版本默认是1。
max.in.flight.requests.per.connection允许最多没有返回 ack 的次数, 默认为 5,开启幂等性要保证该值是 1-5 的数字
retries当消息发送出现错误的时候,系统会重发消息。 retries表示重试次数。 默认是 int 最大值, 2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。
retry.backoff.ms两次重试之间的时间间隔,默认是 100ms。
enable.idempotence是否开启幂等性, 默认 true,开启幂等性。
compression.type生产者发送的所有数据的压缩方式。 默认是 none,也就是不压缩。支持压缩类型: none、 gzip、 snappy、 lz4 和 zstd。

生产者分区

分区好处
  • 便于合理使用存储资源, 每个Partition在一个Broker上存储, 可以把海量的数据按照分区切割成一
    块一块数据存储在多台Broker上。 合理控制分区的任务, 可以实现负载均衡的效果。
  • 提高并行度, 生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据

在这里插入图片描述

分区策略
老版本

分区原则

发送的数据要封装成一个ProduceRecord对象,该对象中有partition、key、value等属性

  • 指明partition的情况下,直接将指明的值作为partition值
  • 没有指明partition值,但有key值的情况下,将key的hash值与当前topic的partition存活数进行取余得到partition值
  • 既没有指明partition值又没有key值的情况下,采用轮询的方式选取分区。但谁来做第一个分区?kafka采用的机制是:在第一次调用时随机生成一个整数,后面每次调用在这个整数上自增,将这个值与当前topic可用的partition总数取余得到partition值。(由于每次调用都会在这个整数上自增,所以取余后的结果也是自增或等于初始值,也就达到了轮询每个partition的效果)。这就是round-robin算法

源码

//这个方法是默认的分区策略类里的,能进到这个方法,说明肯定没有指定partition
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  			//获取当前topic的partition数目
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        //没有指定key
        if (keyBytes == null) {
            int nextValue = this.nextValue(topic);
          //当前topic存活的partition数
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
              	//从存活的partition里选取一个partition返回
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return ((PartitionInfo)availablePartitions.get(part)).partition();
            } else {
                //选取一个partition返回
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
           //有key值,将key的hash值与当前topic的partition总数进行取余得到partition值
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }
新版本

分区原则
在这里插入图片描述

源码

DefaultPartitioner:

/**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes serialized key to partition on (or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        return partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size());
    }

    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param numPartitions The number of partitions of the given {@code topic}
     * @param key The key to partition on (or null if no key)
     * @param keyBytes serialized key to partition on (or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
                         int numPartitions) {
        if (keyBytes == null) {
            //没有指定key,采用粘性分区策略
            return stickyPartitionCache.partition(topic, cluster);
        }
        // hash the keyBytes to choose a partition
        //指定了key,使用key的哈希值与【分区总数】进行求模
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

StickyPartitionCache:

/**
 * An internal class that implements a cache used for sticky partitioning behavior. The cache tracks the current sticky
 * partition for any given topic. This class should not be used externally. 
 */
public class StickyPartitionCache {
    private final ConcurrentMap<String, Integer> indexCache;
    public StickyPartitionCache() {
        this.indexCache = new ConcurrentHashMap<>();
    }

    public int partition(String topic, Cluster cluster) {
        //尽可能使用上一个分区(所以叫黏性分区策略)
        Integer part = indexCache.get(topic);
        if (part == null) {
            //没办法了(该分区batch已满或已完成),找下一个分区
            return nextPartition(topic, cluster, -1);
        }
        return part;
    }

    public int nextPartition(String topic, Cluster cluster, int prevPartition) {
        //当前topic的分区数
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        Integer oldPart = indexCache.get(topic);
        Integer newPart = oldPart;
        // Check that the current sticky partition for the topic is either not set or that the partition that 
        // triggered the new batch matches the sticky partition that needs to be changed.
        if (oldPart == null || oldPart == prevPartition) {
            //当前topic可用分区数
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() < 1) {
                //没有可用分区,从所有分区里随机选一个算了
                Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                newPart = random % partitions.size();
            } else if (availablePartitions.size() == 1) {
                //就一个可用的,别无选择
                newPart = availablePartitions.get(0).partition();
            } else {
                //有多个可用的,这就得挑三拣四一下了
                while (newPart == null || newPart.equals(oldPart)) {
                    //新分区不能和上一个分区一样,若是一样,就继续选!就是这么倔
                    int random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                    //现在有多个可用分区,当然是从可用分区里选
                    newPart = availablePartitions.get(random % availablePartitions.size()).partition();
                }
            }
            // Only change the sticky partition if it is null or prevPartition matches the current sticky partition.
            if (oldPart == null) {
                indexCache.putIfAbsent(topic, newPart);
            } else {
                indexCache.replace(topic, prevPartition, newPart);
            }
            return indexCache.get(topic);
        }
        return indexCache.get(topic);
    }

}

吞吐量提升

在这里插入图片描述

数据可靠性

概述

为保证producer发送的数据,能可靠的发送到指定的topic,topic的每个partition收到数据后,都需要向producer发送ack(acknowledgement确认收到),如果producer收到ack,就会进行下一轮的发送,否则重新发送数据。

对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR中的follower全部接收成功。因此,kafka为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡

ack应答级别

acks参数配置:

  • 0: 不等待。producer不等待Broker的ack,这一操作提供了一个最低的延迟,Broker一接收到还没有写入磁盘就已经返回,当Broker故障时有可能丢失数据
  • 1: 只等待leader。producer等待Broker的ack,leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据
  • -1(all): 等待leader和ISR中的follower。producer等待Broker的ack,leader和ISR中的follower全部落盘成功后才会返回ack。
    • 极限情况下也会造成数据丢失:比如当所有follower同步消息都比较慢时,此时ISR中只有leader自己,这种情快下当leader落盘成功,就等于ISR中全部落盘成功了,便会返回ack,在follower同步完成之前,若leader挂掉,则会导致数据丢失
    • 数据重复:follower同步完成之后,Broker发送ack之前,leader发生故障,producer没有收到ack便会重发消息,故造成数据重复
    • 在这里插入图片描述

图示
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
ISR
在这里插入图片描述

  • leader维护了一个动态的ISR(in-sync replica set),意为和leader保持同步的follower集合。当ISR中的follower完成数据的同步之后,leader就会给follower发送ack(不应该是follower给leader发送么)。如果follower长时间未向leader同步数据,则该follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定(默认10秒)。leader发生故障后,就会从ISR中选举新leader
  • 由于上述条件限制,所以ISR中的follower都是同步速度很快的follower,未来leader也会在ISR中选举,所以leader只要等ISR中的follower都同步完成,就可以向生产者发送ack

老版本中,follower进入ISR有两个条件:

  • follower中消息条数与leader中的消息条数差值要小于设定的参数阈值(默认10000条)
  • follower向leader同步数据的时间要小于设定的阈值

0.9版本移除了条数限制,原因:

生产者一般都是批量发送数据,假设条数阈值为10条,但生产者一次就发来了12条,这时leader中比所有follower都多12条数据,所有follower都会被移除ISR,但很快一些follower同步完成,又会把他们移入ISR,ISR存在内存中,这就会导致频繁操作内存。而且kafka会将ISR信息写入zookeeper,这也会导致kafka频繁请求zookeeper。

数据去重

数据传递语义在这里插入图片描述
幂等性
原理

在这里插入图片描述

开启幂等性

开启参数 enable.idempotence ,默认为 true

生产者事务
原理

在这里插入图片描述

API
// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,String consumerGroupId) throws
ProducerFencedException;
// 4 提交事务
void commitTransaction() throws ProducerFencedException;
// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;
数据有序

在这里插入图片描述

数据乱序

如果未开启幂等性,且max.in.flight.requests.per.connection大于1的话,可能会出现:其中某条消息发送失败,在重试时,该消息后面的消息发送成功,导致乱序。

单分区内有序条件:
在这里插入图片描述

Broker

Zookeeper中存储的Kafka信息

在这里插入图片描述

总体工作流程

在这里插入图片描述

Broker重要参数

参数名称说明
replica.lag.time.max.msISR 中, 如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值, 默认 30s。
auto.leader.rebalance.enable默认是 true。 自动 Leader Partition 平衡。
leader.imbalance.per.broker.percentage默认是 10%。 每个 broker 允许的不平衡的 leader的比率。如果每个 broker 超过了这个值,控制器会触发 leader 的平衡
leader.imbalance.check.interval.seconds默认值 300 秒。检查 leader 负载是否平衡的间隔时间。
log.segment.bytesKafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分成块的大小, 默认值 1G。
log.index.interval.bytes默认 4kb, kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引。
log.retention.hoursKafka 中数据保存的时间, 默认 7 天。
log.retention.minutesKafka 中数据保存的时间, 分钟级别,默认关闭。
log.retention.msKafka 中数据保存的时间, 毫秒级别,默认关闭。
log.retention.check.interval.ms检查数据是否保存超时的间隔, 默认是 5 分钟
log.retention.bytes默认等于-1,表示无穷大。 超过设置的所有日志总大小,删除最早的 segment。
log.cleanup.policy默认是 delete,表示所有数据启用删除策略;
如果设置值为 compact,表示所有数据启用压缩策略
num.io.threads默认是 8。 负责写磁盘的线程数。整个参数值要占总核数的 50%。
num.replica.fetchers副本拉取线程数,这个参数占总核数的 50%的 1/3
num.network.threads默认是 3。 数据传输线程数,这个参数占总核数的50%的 2/3
log.flush.interval.messages强制页缓存刷写到磁盘的条数,默认是 long 的最大值, 9223372036854775807。一般不建议修改,交给系统自己管理。
log.flush.interval.ms每隔多久,刷数据到磁盘,默认是 null。一般不建议修改,交给系统自己管理

副本

基本信息
  • 作用:提高数据可靠性
  • Kafka默认副本1个,生产环境一般配置为2个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率
  • Kafka中副本分为:Leader和Follower。Kafka生产者只会把数据发往Leader,然后Follower找Leader同步数据
  • Kafka分区中的所有副本统称AR(Assigned Replicas)
    • AR = ISR + OSR
    • ISR,表示和 Leader 保持同步的 Follower 集合。 如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值由 replica.lag.time.max.ms参数设定,默认 30s。 Leader 发生故障之后,就会从 ISR 中选举新的 Leader
    • OSR, 表示 Follower 与 Leader 副本同步时,延迟过多的副本
Leader选举流程

Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader,负责管理集群broker 的上下线,所有 topic 的分区副本分配和 Leader 选举等工作。
Controller 的信息同步工作是依赖于 Zookeeper 的。
在这里插入图片描述

故障处理细节
Follower故障

在这里插入图片描述

Leader故障

在这里插入图片描述

注:

这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。数据不丢失或不重复还得看ack、幂等性和事务

Leader Partition自动平衡

在这里插入图片描述

文件存储

文件存储机制
Topic数据存储机制

在这里插入图片描述

.log文件和.index文件

在这里插入图片描述

注:index文件中存储的是相对offset,绝对offset那一列只是为了方便看图加上的

.index文件中存储:

  • 消息的偏移量
  • 对应消息的存储地址偏移量
  • 对应消息的大小

.log文件中存储:

  • 消息内容

注:.indx文件中存的内容是固定的,就是存消息偏移量、存储地址偏移量、消息大小等信息,所以在根据消息偏移量找对应消息时,可以直接采用 消息偏移量 * .index中单个内容的大小,快速找到要读取的消息信息地址,加快查询速度

参数名称说明
log.segment.bytesKafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分成块的大小, 默认值 1G。
log.index.interval.bytes默认 4kb, kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引。 稀疏索引

向Log中追加消息时是顺序写入的,只有最后一个LogSegment才能执行写入操作,在此之前所有的LogSegment都不能写入数据。最后一个LogSegment也称为“activeSegment”,表示当前活跃的日志分段。随着消息的不断写入,当activeSegment满足一定的条件时,就需要创建新的activeSegment,之后追加的消息将写入新的activeSegment。

segment切分条件(满足其一即可):

  • 当前日志分段文件大小超过了broker端参数log.segment.bytes配置的值,log.segment.bytes参数默认值1GB
  • 当前日志分段中消息的最大时间戳与当前系统的时间戳的差值大于log.roll.ms或log.roll.hours参数配置的值。如果同时配置了他俩,那么log.roll.ms的优先级高。默认情况下,只配置了log.roll.hours参数,其值为168,即7天。
  • 偏移量索引文件或时间戳索引文件的大小达到broker端参数log.index.size.max.bytes配置的值。默认10MB。
  • 追加的消息的偏移量与当前日志分段的偏移量之间的差追大于Integer.MAX_VALUE,即要追加的消息的偏移量不能转变为相对偏移量(offset-baseOffset > Integer.MAX_VALUE)
日志索引

每个日志文件对应两个索引文件,主要用来提高查找消息的效率。偏移量索引文件用来建立消息偏移量(offset)到物理地址之间的映射关系,方便快速定位消息所在的物理位置;时间戳索引文件则根据指定的时间戳(timestamp)来查找对应的偏移量信息。即:

  • 偏移量索引文件
    • 偏移量:物理地址
  • 时间戳索引文件
    • 时间戳:偏移量

Kafka中的索引文件以稀疏索引的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项。每当写入一定量(由broker端参数log.index.interval.bytes指定,默认值4096,即4kb)的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项。

稀疏索引通过MappedByteBuffer将索引文件映射到内存中,以加快索引的查询速度,偏移量索引文件中的偏移量是单调递增的,查询指定偏移量时,使用二分查找法来快速定位偏移量的位置,如果指定偏移量不在索引文件中,则会返回小于指定偏移量的最大偏移量。时间戳索引文件中的时间戳也保持严格的单调递增,查询指定时间戳时,也根据二分查找法来查找不大于该时间戳的最大偏移量,至于要找到对应的物理文件位置还需要根据偏移量索引文件来进行再次定位。稀疏索引的方式是在磁盘空间、内存空间、查找时间等多方面之间的一个折中。

对非activeSegment而言,其对应的索引文件内容已经固定而不需要写入索引项,所以会被设定为只读。而对activeSegment而言,索引文件还会追加更多的索引项,所以被设定为可读可写。在索引文件切分的时候,Kafka会关闭当前正在写入的索引文件并置为只读模式,同时以可读写的模式创建新的索引文件,索引文件的大小由broker端参数log.index.size.max.bytes配置。Kafka在创建索引文件的时候会为其预分配log.index.size.max.bytes大小对的空间,注意这一点与日志分段文件不同,只有当索引文件进行切分的时候,Kafka才会把该索引文件裁剪到实际的数据大小。也就是说,与当前活跃的日志分段对应的索引文件大小固定为log.index.size.max.bytes,而其余日志分段对应的索引文件大小为实际的占用空间

消息读取机制
根据偏移量
  1. 根据要读取的消息偏移量,采用二分查找,定位对应的日志分段的baseOffset
    1. Kafka的每个日志对象中使用了ConcurrentSkipListMap来保存各个日志分段,每个日志分段的baseOffset作为key,这样可以根据指定偏移量来快速定位到消息所在的日志分段
  2. 在该.index文件中,根据二分查找法,找到不大于目标偏移量的最大偏移量对应的物理地址
  3. 根据2中结果,去.log文件从2中的地址开始顺序查找偏移量为目标偏移量的消息
根据时间戳
  1. 将目标时间戳和每个日志分段中的最大时间戳逐一对比,直到找到不小于目标时间戳的日志分段,日志分段中的最大时间戳的计算是先查询日志分段对应的时间戳索引文件,找到最后一条索引项,若最后一条索引项的时间戳值字段大于0,则取其值,否则取该日志分段的最近修改时间。
  2. 找到相应的日志分段后,在时间戳索引文件中使用二分查找法查找不大于目标时间戳的最大索引项,并取该索引项的偏移量
  3. 在偏移量索引文件中使用二分查找法查找不大于步骤2中结果的最大索引项,并取其物理位置
  4. 在步骤1中定位到的日志分段文件中,在步骤3的位置开始查找不小于目标时间戳的消息
文件清理策略

Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间:

  • log.retention.hours, 最低优先级小时,默认 7 天。
  • log.retention.minutes, 分钟。
  • log.retention.ms, 最高优先级毫秒。
  • log.retention.check.interval.ms, 负责设置检查周期,默认 5 分钟

日志超时后,Kafka提供的清理策略:

  • delete 日志删除:将过期数据删除
    • log.cleanup.policy = delete 所有数据启用删除策略
      • 基于时间:默认打开。 以 segment 中所有记录中的最大时间戳作为该文件时间戳
      • 基于大小:默认关闭。超过设置的所有日志总大小,删除最早的 segment(log.retention.bytes,默认等于-1,表示无穷大)
  • compact 日志压缩
    • 在这里插入图片描述

高效读写数据

Kafka能够高效读写的原因

  • Kafka本身是分布式集群,可以采用分区技术,并行度高

  • 读数据采用稀疏索引,可以快速定位要消费的数据

  • 顺序写磁盘

    • Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,
      为顺序写。 官网有数据表明,同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这
      与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间
  • 页缓存 + 零拷贝技术

    • 页缓存:

      • 页缓存是操作系统实现的一种主要的磁盘缓存,以此减少对磁盘的IO操作。具体来说就是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问。
      • 当一个进程准备读取磁盘上的文件内容时,操作系统会先查看待读取的数据所在的页是否在页缓存中,如果存在(命中)则直接返回数据,从而避免了对物理磁盘的IO操作;如果没有命中,则操作系统会向磁盘发起读取请求并将读取的数据页存入页缓存,之后再将数据返回给进程。同样,如果一个进程需要将数据写入磁盘,那么操作系统也会检测对应的页是否在页缓存中,如果不存在,则会先在页缓存中添加相应的页,最后将数据写入对应的页。被修改过后的页也就变成了脏页,操作系统会在合适的时间把脏页中的数据写入磁盘,以保持数据一致性。
    • 零拷贝:

      • 零拷贝是指将数据直接从磁盘文件复制到网卡设备中,而不需要经由应用程序之手。
      • 零拷贝大大提高了应用程序的性能,减少了内核和用户模式上下文之间的切换。
      • 零拷贝是针对内核模式而言的,数据在内核模式下实现了零拷贝。
    • 在这里插入图片描述

    • 相关参数

      • 参数名称说明
        log.flush.interval.messages强制页缓存刷写到磁盘的条数,默认是 long 的最大值,9223372036854775807。 一般不建议修改,交给系统自己管理。
        log.flush.interval.ms每隔多久,刷数据到磁盘,默认是 null。一般不建议修改,交给系统自己管理。

协议设计

概述

Kafka自定义了一组基于TCP的二进制协议,只要遵守这组协议的格式,就可以向Kafka发送消息,也可以从Kafka中拉取消息,或者做一些其他的事情,比如提交消费位移等。Kafka中每种协议类型都有对应的请求(Request)和响应(Response),它们都遵循特定的协议模式。

请求

每种类型的Request都包含相同结构的协议请求头(RequestHeader)和不同结构的协议请求体(RequestBody),如下图所示:

在这里插入图片描述

协议请求头4个域介绍:

描述
api_keyAPI标识,比如PRODUCE、FETCH等分别表示发送消息和拉取消息的请求
api_versionAPI版本号
correlation_id由客户端指定的一个数字来唯一的标识这次请求的id,服务端在处理完请求后也会把同样的correlation_id写到Response中,这样客户端就能把某个请求和响应对应起来了
client_id客户端id
响应

每种类型的Response也包含相同结构的协议响应头(ResponseHeader)和不同结构的协议响应体(ResponseBody),如下图所示:
在这里插入图片描述

协议响应头中只有一个correlation_id,释义如请求头相同

示例
ProduceRequest

注:Kafka协议内部结构并非一成不变,ProduceRequest/ProduceResponse就已经经历了7个版本(V0-V6)的变迁,以下示例为V6版本
在这里插入图片描述

ProduceRequest/ProduceResponse对应api_key = 0,表示PRODUCE

ProduceRequest请求体中各个域含义:

类型描述
transaactional_idnullable_string事务id。如果不使用事务功能,则该域的值为null
acksInt16客户端参数acks
timeoutInt32请求超时时间,对应客户端参数request.timeout.ms,默认30秒
topic_dataarray代表ProduceRequest中所要发送的数据集合。以主题名称分类,主题中再以分区分类。注意这个域是数组类型
|topicstring主题名称
|dataarray与主题对应的数据,也是数组类型
|partitionInt32分区编号
|record_setrecords与分区对应的数据

消息发送时,RecordAccumulator中的消息以<分区,Deque<ProducerBatch>>的形式进行缓存,之后由Sender线程转变成<Node,List<ProducerBatch>>的形式,针对每个Node,Sender线程在发送消息前会将对应的List<ProducerBatch>形式的内容转变成ProduceRequest的具体结构。List<ProducerBatch>中的内容首先按照主题名称进行分类(对应ProduceRequest中的topic域),然后按照分区编号进行分类(对应ProduceRequest中的partition域),分类之后的ProducerBatch集合就对应ProduceRequest中的域records_set。

由于分区中的消息是按顺序追加的,那么在客户端中按照分区归纳好之后就可以省去在服务端中转换的操作了,这样将负载的压力分摊给了客户端,从而使服务端可以专注于它的分内之事,如此也可以提升整体性能。

ProduceResponse

注:V6版本
在这里插入图片描述

FetchRequest

拉取消息对应的api_key=1,表示FETCH,下图为V8版本

在这里插入图片描述

Session_id、epoch和forgotten_topics_data干啥用?

不管是follower副本还是普通的消费者客户端,如果要拉取某个分区中的消息,就需要指定详细的拉取信息,也就是需要设定partition、fetch_offset、log_start_offset和max_bytes这4个域的具体值,那么对每个分区而言,就需要占用4B+8B+8B+4B=24B的空间。一般情况下,不管是follower副本还是普通的消费者,它们订阅的信息是长期固定的。也就是说,FetchRequest中的topics域的内容是长期固定的,只有在拉取开始时或发生某些异常时会有所变动。FetchRequest请求是一个非常频繁的请求,如果要拉取的分区数有很多,比如有1000个分区,那么在网络上频繁交互FetchRequest时就会有固定的大约24KB的字节内容在传动,如果将这24KB的状态保存起来,那么就可以节省这部分所占用的带宽。

Kafka从1.1.0版本开始针对FetchRequest引入了session_id、epoch和forgotten_topics_data等域,session_id、epoch确定一条拉取链路的fetch session,当session建立或发生变更时会发送全量式的FetchRequest,所谓的全量式就是指请求体中包含所有需要拉取的分区信息;当session稳定时则会发送增量式的FetchRequest请求,里面的topics域为空,因为topics域的内容已经被缓存在了session链路的两侧。如果需要从当前fetch session中取消对某些分区的拉取订阅,则可以使用forgotten_topics_data字段来实现。

FetchResponse

在这里插入图片描述

时间轮

个人觉着这是Kafka中很有意思的一个设计

Kafka中存在大量的延时操作,比如延时生产、延时拉取和延时删除等。Kafka并没有使用JDK自带的Timer或DelayQueue来实现延时的功能,而是基于时间轮的概念自定义实现了一个用于延时功能的定时器。JDK自带的Timer或DelayQueue的插入和删除操作的平均时间复杂度为O(nlogn),并不能满足Kafka的高性能要求,而基于时间轮可以将插入和删除的时间复杂度都降为O(1)。

Kafka中的时间轮(TimingWheel)是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList)。TimerTaskList是一个环形的双向链表,链表中的每一项都是定时任务项(TimerTaskEntry),其中封装了真正的定时任务(TimerTask)。

时间轮由多个时间格组成,每个时间格代表当前时间轮的基本时间跨度(tickMs)。时间轮的时间格数是固定的,可用wheelSize来表示,那么整个时间轮的总体时间跨度(interval)可以通过公式tickMs x wheelSize计算得出。时间轮还有一个表盘指针(currentTime),用来表示时间轮当前所处的时间,currentTime是tickMs的整数倍。currentTime可以将整个时间轮划分为到期部分和未到期部分,currentTime当前指向的时间格也属于到期部分,表示刚好到期,需要处理此时间格所对应的TimerTaskList中的所有任务。

在这里插入图片描述
若时间轮的tickMs为1ms且wheelSize等于20,那么可以计算得出总体时间跨度interval为20ms。初始情况下currentTime指向时间格0,此时有一个定时为2ms的任务插进来会存放到时间格为2的TimerTaskList中。随着时间的不断推移,指针currentTime不断向前推进,过了2ms之后,当到达时间格2时,就需要将时间格2对应的TimeTaskList中的任务进行相应的到期操作。此时若又有一个定时为8ms的任务插进来,则会存放到时间格10中,currentTime再过8ms后会指向时间格10.如果同时有一个定时为19ms的任务插进来怎么办?新来的TimeTaskEntry会复用原来的TimeTaskList,所以他会插入原本已经到期的时间格1。总之,整个时间轮的总体跨度是不变的,随着指针currentTime的不断推进,当前时间轮所能处理的时间段也在不断后移,总体时间范围在currentTime和currentTime + interval之间。

但如果有一个很久之后的定时任务(超过interval)怎么办?比如350ms,此时不能复用已有的TimeTaskList,因为需要很多圈之后才会执行该定时任务,但时间轮并不能判断这个“圈数”。也不能直接扩充wheelSize,因为总不能无限扩充,所以这并不能从根本上上解决问题,而且wheelSize很大的话会占用很大的内存空间,而且也会拉低效率。Kafka为此引入了层级时间轮的概念,当任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中。

在这里插入图片描述

注:

  • 第二层时间轮的tickMs为第一层时间轮的interval
  • 每一层时间轮的wheelSize是固定的,都是20

对于之前所说的350ms的定时任务,显然第一层时间轮不能满足条件,所以就升级到第二层时间轮中,并放入时间格17所对应的TimeTaskList。如果此时又有定时为450ms的任务,那么显然第二层时间轮也无法满足条件,所以又升级到第三层时间轮中,最终被插入到时间格1的TimeTaskList。注意到在到期时间为[400ms, 800ms)区间内的多个任务都会被放入第三层时间轮的时间格1,时间格1对应的TimeTaskList的超时时间为400ms。随着时间的流逝,当此TimerTaskList到期之时,原本定时为450ms的任务还剩下50ms的时间,还不能执行这个任务的到期操作。这里就有一个时间轮降级的操作,会将这个剩余时间为50ms的定时任务重新提交到层级时间轮中,此时第一层时间轮的总体时间跨度不够,而第二层足够,所以该任务被放到第二层时间轮到期时间为[40ms, 60ms)的时间格中。再经历40ms后,此时这个任务又被“察觉”,不过还剩余10ms,还是不能立即执行到期操作。所以还要再有一次时间轮的降级,此任务被添加到第一层时间轮到期时间为[10ms, 11ms)的时间格中,之后再经历10ms,此任务真正到期,最终执行相应的到期操作。

在Kafka中,第一层时间轮参数:

  • tickMs = 1
  • wheelSize = 20(各层级时间轮wheelSize固定为20)
  • interval = tickMs x wheelSize = 20

Kafka中定时器只需持有第一层时间轮的引用,并不会直接持有其他高层时间轮,但每一层时间轮都会有一个引用指向更高一层的时间轮,以此层级调用可以实现定时器间接持有各层级时间轮的引用。(就像开车一样,只能逐级升档,但可以跨级降档。不过这里降级时只能直接降到一级时间轮,因为高层并不持有底层时间轮引用)

这个层级时间轮和钟表是一样的,钟表第一层时间轮tickMs = 1s,wheelSize = 60, interval = 60s = 1min,此为秒钟;第二层tickMs = 1min,wheelSize = 60, interval = 60min = 1hour,此为分钟;第三层tickMs = 1h,wheelSize = 12, interval = 12h,此为时钟;

消费者

消费方式

在这里插入图片描述

Consumer采用pull(拉)的方式从Broker中读取数据

kafka中没有数据时,pull模式可能会使消费者陷入空转。针对这一点,kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回,这段时间即为timeout

消费者总体工作流程

在这里插入图片描述

订阅主题与分区

一个消费者可以订阅一个或多个主题

订阅
  • subscribe

    • 定义

      • public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
        
        public void subscribe(Collection<String> topics)
        
        public void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
        
        public void subscribe(Pattern pattern)
        
    • 对于该方法,可以以集合的形式订阅多个主题,也可以以正则表达式的形式订阅特定模式的主题

    • 如果前后两次订阅了不同的主题,那么消费者以最后一次的为准

    • 如果采用正则表达式的方式订阅,在之后的过程中,如果有人又创建了新的主题,并且主题的名字与正则表达式相匹配,那么这个消费者就可以消费到新添加的主题中的消息

  • assign

    • 定义

      • public void assign(Collection<TopicPartition> partitions)
        
    • 可直接订阅某些主题的特定分区

    • 只有一个参数,用来指定订阅的分区集合

    • TopicPartition类

      • public class TopicPartition {
          private final int partition;
          private final String topic;
        
          //构造器、hashCode...
        }
        

注:

通过subscribe()方法订阅主题具有消费者自动再均衡的功能,在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费者组内的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。assign()方法订阅时无该功能。

如何获取主题的分区信息?

KafkaConsumer中有方法:

public List<PartitionInfo> partitionsFor(String topic)

PartitionInfo:

public class PartitionInfo {
    private final String topic;
    private final int partition;
    private final Node leader;
  	// AR集合
    private final Node[] replicas;
  	//ISR集合
    private final Node[] inSyncReplicas;
  	//OSR集合
    private final Node[] offlineReplicas;
}
取消订阅
public void unsubscribe()

该方法可以取消通过subscribe(Collection)方式实现的订阅,也可以取消通过subscribe(Pattern)方式实现的订阅,还可以取消通过assign(Collection)方式实现的订阅。

注:如果将subscribe(Collection)或assign(Collection)中的集合参数设置为空集合,那么作用等同于unsubscribe()方法

订阅状态:

  • AUTO_TOPICS
    • subscribe(Collection)方式订阅时即为该状态
  • AUTO_PATTERN
    • subscribe(Pattern)方式订阅时即为该状态
  • USER_ASSIGNED
    • assign(Collection)方式订阅时即为该状态
  • NONE
    • 没有订阅

这些状态是互斥的,一个消费者只能出现其中一种,否则会报出IllegalStateException

消息消费

public ConsumerRecords<K, V> poll(final Duration timeout)

timeout方法用来控制poll()方法的阻塞时间,在消费者的缓冲区里没有可用数据时会发生阻塞

timeout的设置取决于应用程序对响应速度的要求,比如需要在多长时间内将控制权移交给执行轮询的应用线程。可以直接将timeout设置为0,这样poll()方法会立刻返回,而不管是否已经拉取到了消息。如果应用线程唯一的工作就是从Kafka中拉取并消费消息,则可以将这个参数设置为最大值Long.MAX_VALUE

poll()方法返回值类型是ConsumerRecords,表示一次拉取操作所获得的消息集,内部包含了若干ConsumerRecord,它提供了iterator()方法来遍历消息集内部的消息:

public Iterator<ConsumerRecord<K, V>> iterator()

消费者组

原理

在这里插入图片描述
在这里插入图片描述

初始化流程

在这里插入图片描述

消费流程

在这里插入图片描述

重要参数

参数名称说明
bootstrap.servers向 Kafka 集群建立初始连接用到的 host/port 列表。
key.deserializer 和value.deserializer指定接收消息的 key 和 value 的反序列化类型。一定要写全类名
group.id标记消费者所属的消费者组
enable.auto.commit默认值为 true,消费者会自动周期性地向服务器提交偏移量。
auto.commit.interval.ms如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率, 默认 5s。
auto.offset.reset当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量。
latest: 默认, 自动重置偏移量为最新的偏移量。
none:如果消费组原来的( previous)偏移量不存在,则向消费者抛异常。
anything:向消费者抛异常。
offsets.topic.num.partitions__consumer_offsets 的分区数, 默认是 50 个分区。
heartbeat.interval.msKafka 消费者和 coordinator 之间的心跳时间, 默认 3s。该条目的值必须小于 session.timeout.ms ,也不应该高于session.timeout.ms 的 1/3。
session.timeout.msKafka 消费者和 coordinator 之间连接超时时间, 默认 45s。超过该值,该消费者被移除,消费者组执行再平衡
max.poll.interval.ms消费者处理消息的最大时长, 默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡
fetch.min.bytes默认 1 个字节。消费者获取服务器端一批消息最小的字节数
fetch.max.wait.ms默认 500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据
fetch.max.bytes默认 Default: 52428800( 50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes ( brokerconfig) or max.message.bytes (topic config) 影响
max.poll.records一次 poll 拉取数据返回消息的最大条数, 默认是 500 条

分区分配策略

一个Consumer Group中有多个Consumer,一个Topic有多个Partition,所以必然会涉及到Partition的分配问题,即确定哪个Partition由哪个Consumer来消费

kafka分配策略:

  • Range
  • RoundRobin
  • Sticky
  • CooperativeSticky

可通过partition.assignment.strategy参数来设置分区策略,默认是Range + CooperativeSticky.(Kafka可同时使用多个分区策略)

当消费者组中消费者个数发生变更时,就会触发重新分配。即使消费者数目增加到大于分区数,也会重新分配

Range分区策略原理

在这里插入图片描述

RoundRobin分区策略原理

在这里插入图片描述

Sticky分区

粘性分区定义: 可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略, 首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化

offset的维护

由于Consumer在消费过程中可能宕机,Consumer恢复后,需要从故障前的位置继续消费,所以Consumer需要实时记录自己消费到了哪个offset,以便故障后能够继续消费。

存储位置

在这里插入图片描述

__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。 key 是 group.id+topic+分区号, value 就是当前 offset 的值。 每隔一段时间, kafka 内部会对这个 topic 进行compact,也就是每个 group.id+topic+分区号就保留最新数据。

提交
自动提交

在这里插入图片描述

手动提交

在这里插入图片描述

指定offset消费

参数auto.offset.reset = earliest | latest | none

默认是 latest

当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量
时(例如该数据已被删除),该怎么办?

  • earliest:自动将偏移量重置为最早的偏移量, --from-beginning。
  • latest(默认值):自动将偏移量重置为最新偏移量。
  • none:如果未找到消费者组的先前偏移量,则向消费者抛出异常
kafkaConsumer.seek(TopicPartition, 指定消费的offset);

注意:每次执行完,需要修改消费者组名;

重复消费和漏消费

重复消费: 已经消费了数据,但是 offset 没提交。
漏消费: 先提交 offset 后消费,有可能会造成数据的漏消费。

在这里插入图片描述

消费者事务

在这里插入图片描述

提高消费者吞吐量

在这里插入图片描述

代码实操

基操

生产者

需要用到的类:

  • KafkaProducer:需要创建一个生产者对象,用来发送数据
  • ProducerConfig:获取所需的一系列配置参数
  • ProducerRecord:每条数据都要封装成一个 ProducerRecord 对象
发送消息
/**
     * 简单发送
     */
    @Test
    public void testProducer() {

        KafkaProducer<String, String> producer = getKafkaProducer();
      // 异步发送
        producer.send(new ProducerRecord<>("first", "test", "hello kafka"));
      //同步发送
				producer.send(new ProducerRecord<>("first", "test", "hello kafka")).get();
        //关闭资源
        producer.close();
    }

    /**
     * 发送后触发回调函数
     */
    @Test
    public void testProducerWithCallBack() {

        KafkaProducer<String, String> producer = getKafkaProducer();
        producer.send(new ProducerRecord<>("first", "test", "hello kafka"), new Callback() {
            //回调方法,会在producer收到ack时触发
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if (Objects.isNull(e)) {
                    //没有异常
                    System.out.println(recordMetadata.toString());
                } else {
                    e.printStackTrace();
                }
            }
        });

        //关闭资源
        producer.close();
    }
  private KafkaProducer<String, String> getKafkaProducer() {
          //配置
          Properties properties = new Properties();
          properties.put("bootstrap.servers","shangxiaoying.cn:9092");
          properties.put("acks", "all");
          //配置的key可以在ProducerConfig中找到
          properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
          properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
          //这里指定要使用的分区策略
          properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"cn.shangxiaoying.kafka.partitioner.MyPartitioner");

          KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
          return producer;
      }
事务内发送消息
public class CustomProducerTransactions {
	public static void main(String[] args) throws InterruptedException {
    // 1. 创建 kafka 生产者的配置对象
    Properties properties = new Properties();
    // 2. 给 kafka 配置对象添加配置信息
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
    // key,value 序列化
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    // 设置事务 id(必须),事务 id 任意起名
    properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0");
    // 3. 创建 kafka 生产者对象
    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
    // 初始化事务
    kafkaProducer.initTransactions();
    // 开启事务
    kafkaProducer.beginTransaction();
    try {
      // 4. 调用 send 方法,发送消息
      for (int i = 0; i < 5; i++) {
        // 发送消息
        kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i));
      }
      // int i = 1 / 0;
      // 提交事务
      kafkaProducer.commitTransaction();
    } catch (Exception e) {
      // 终止事务
      kafkaProducer.abortTransaction();
    } finally {
      // 5. 关闭资源
      kafkaProducer.close();
    }
  }
}
自定义分区策略

编写类实现Partitioner接口,重写partition()方法

public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 自定义选择分区的逻辑
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

在生产者配置中指定分区策略

private KafkaProducer<String, String> getKafkaProducer() {
  //配置
  ...
  properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  //这里指定要使用的分区策略
  properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"cn.shangxiaoying.kafka.partitioner.MyPartitioner");
	KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
  return producer;
}
//发送消息
...

消费者

Consumer 消费数据时的可靠性是很容易保证的,因为数据在 Kafka 中是持久化的,故不用担心数据丢失问题。 由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。 所以 offset 的维护是 Consumer 消费数据时必须考虑的问题。

需要用到的类:

  • KafkaConsumer:需要创建一个消费者对象,用来消费数据
  • ConsumerConfig:获取所需的一系列配置参数
  • ConsuemrRecord:每条数据都要封装成一个 ConsumerRecord 对象

为了使我们能够专注于自己的业务逻辑,Kafka 提供了自动提交 offset 的功能。 自动提交 offset 的相关参数:

  • enable.auto.commit:是否开启自动提交 offset 功能
  • auto.commit.interval.ms:自动提交 offset 的时间间隔
自动提交offset
@Test
public void testConsumer() {
        KafkaConsumer<String, String> kafkaConsumer = getKafkaConsumer();
        //订阅topic,可以订阅多个
        kafkaConsumer.subscribe(Collections.singletonList("first"));
        while (true) {
            //拉取消息,参数为没有消息时的等待时间
            ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.toString());
            }
        }

}
private KafkaConsumer<String, String> getKafkaConsumer() {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "shangxiaoying.cn:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group_01");
        //开启自动提交(这里提交是指提交offset)
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        //这里是反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer(properties);
        return kafkaConsumer;
}
手动提交offset

虽然自动提交 offset 十分简洁便利,但由于其是基于时间提交的,开发人员难以把握 offset 提交的时机。

因此 Kafka 还提供了手动提交 offset 的 API。 手动提交 offset 的方法有两种:分别是 commitSync(同步提交)和 commitAsync(异步 提交)。两者的相同点是,都会将本次 poll 的一批数据最高的偏移量提交;不同点是, commitSync 阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致, 也会出现提交失败);而 commitAsync 则没有失败重试机制,故有可能提交失败。

@Test
public void testConsumer() {
        KafkaConsumer<String, String> kafkaConsumer = getKafkaConsumer();
        //订阅topic,可以订阅多个
        kafkaConsumer.subscribe(Collections.singletonList("first"));
        while (true) {
            //拉取消息,参数为没有消息时的等待时间
            ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.toString());
            }

            //手动提交,不要忘记关掉自动提交
            //同步提交,当前线程会阻塞直到 offset 提交成功
//            kafkaConsumer.commitSync();
            //异步提交
            kafkaConsumer.commitAsync(new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
                    //业务逻辑
                }
            });
        }

}
private KafkaConsumer<String, String> getKafkaConsumer() {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "shangxiaoying.cn:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group_01");
        //关闭自动提交(这里提交是指提交offset)
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        //这里是反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer(properties);
        return kafkaConsumer;
}

数据漏消费和重复消费:

无论是同步提交还是异步提交 offset,都有可能会造成数据的漏消费或者重复消费。

  • 先提交 offset 后消费,有可能造成数据的漏消费;
    • 提交offset后,数据还没处理完,消费者挂掉,重启后,本地offset丢失,获取kafka存储的offset,这个offset是挂掉前拉取的最新数据的kafka,kafka以为消费者都消费完了,其实并未真正处理完毕,再拉取消息会按照kafka存储的offset拉取,故造成喽消费。
  • 而先消费后提交 offset,有可能会造成数据的重复消费
    • 消费者拉取到数据在处理,处理了一部分数据后消费者挂掉,此时offset并未提交,消费者重启后,本地offset已经丢失,所以会读取kafka存储的offset,这个offset是上次拉取消息之前的,所以又会将挂掉前拉取的消息再拉取一遍,造成重复。
自定义存储offset

要解决上述问题,关键是要保证offset存储和数据处理的一致性。我们可以将offset存入MySQL,使得消息处理和offset存储在同一个事务中,从而保证一致性。

但是:

offset 的维护是相当繁琐的,因为需要考虑到消费者的 Rebalance。 当有新的消费者加入消费者组、已有的消费者退出消费者组或者所订阅的主题的分区发生变化,就会触发到分区的重新分配,重新分配的过程叫做 Rebalance。 消费者发生 Rebalance 之后,每个消费者消费的分区就会发生变化。因此消费者要首先获取到自己被重新分配到的分区,并且定位到每个分区最近提交的 offset 位置继续消费。

如何感知到是否发生Rebalance?

  • Kafka有一个类:ConsumerRebalanceListener可以实现

思路:

  • 在Rebalance之前,每个分区要提交自己最新的offset
  • 在Rebalance之后,每个分区获取自己最新的offset,继续消费

实现:

private static Map<TopicPartition, Long> currentOffset = new HashMap<>();
/**
     * 自定义存储offset
     */
    @Test
    public void testCustomOffset() {
        KafkaConsumer<String, String> kafkaConsumer = getKafkaConsumer();
        //订阅topic,ConsumerRebalanceListener帮助我们实现自定义存储offset
        kafkaConsumer.subscribe(Collections.singletonList("first"), new ConsumerRebalanceListener() {

            //该方法会在 Rebalance 之前调用
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                //Rebalance之前,将每个分区的最新offset提交,这里可以自定义提交至MySQL
                commitOffset(currentOffset);
            }

            //该方法会在 Rebalance 之后调用
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                currentOffset.clear();
                for (TopicPartition partition : collection) {
                    kafkaConsumer.seek(partition, getOffset(partition));//定位到最近提交的 offset 位置继续消费
                }
            }
        });
        while (true) {
            //最佳实践:将数据处理和offset保存放在一个事务中,从而保证数据不会丢失或重复
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);//消费者拉取数据
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf(record.toString());
               //维护currentOffset
                currentOffset.put(new TopicPartition(record.topic(),
                        record.partition()), record.offset());
            }
            commitOffset(currentOffset);
        }
    }


    //获取某分区的最新 offset
    private static long getOffset(TopicPartition partition) {
        //业务逻辑,比如去MySQL中查询最新offset
        return 0;
    }
    //提交该消费者所有分区的 offset
    private static void commitOffset(Map<TopicPartition, Long> currentOffset) {
        //保存offset
    }

拦截器

原理

Producer 拦截器(interceptor)是在 Kafka 0.10 版本被引入的,主要用于实现 clients 端的定制化控制逻辑。

对于 producer而言,interceptor 使得用户在消息发送前以及 producer 回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer 允许用户指定多个 interceptor 按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor 的实现接口是 org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:

  • configure(configs)
    • 获取配置信息和初始化数据时调用。
  • onSend(ProducerRecord)
    • 该方法封装进 KafkaProducer.send 方法中,即它运行在用户主线程中。Producer 确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的 topic 和分区,否则会影响目标分区的计算
  • onAcknowledgement(RecordMetadata, Exception)
    • 该方法会在消息从 RecordAccumulator 成功发送到 Kafka Broker 之后,或者在发送过程中失败时调用。并且通常都是在 producer 回调逻辑触发之前。onAcknowledgement 运行在 producer 的 IO 线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢 producer 的消息发送效率
  • close
    • 关闭 interceptor,主要用于执行一些资源清理工作
    • 如前所述,interceptor 可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。
    • 另外倘若指定了多个 interceptor,则 producer 将按照指定顺序调用它们,并仅仅是捕获每个 interceptor 可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中 要特别留意。
实操

实现一个简单的双 interceptor 组成的拦截链。第一个 interceptor 会在消息发送前将时间戳信息加到消息 value 的最前部;

第二个 interceptor 会在消息发送后更新成功发送消息数或失败发送消息数。

时间戳拦截器

public class TimeInterceptor implements ProducerInterceptor {
    @Override
    public ProducerRecord onSend(ProducerRecord producerRecord) {
        return new ProducerRecord(producerRecord.topic(),
                producerRecord.partition(),
                producerRecord.timestamp(),
                producerRecord.key(),
                System.currentTimeMillis() + "-" + producerRecord.value());
    }

    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {

    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

计数拦截器

public class CountInterceptor implements ProducerInterceptor {
    private Integer successCount = 0;
    private Integer errorCount = 0;
    @Override
    public ProducerRecord onSend(ProducerRecord producerRecord) {
        return producerRecord;
    }

    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        if (Objects.isNull(e)) {
            successCount ++;
        } else {
            errorCount ++;
        }
    }

    @Override
    public void close() {
        System.out.println("success: " + successCount + ", error: " + errorCount);
    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

使用拦截器

@Test
public void testInterceptorChain() {
        //配置
        Properties properties = new Properties();
        properties.put("bootstrap.servers","shangxiaoying.cn:9092");
        properties.put("aacks", "all");
        //配置的key可以在ProducerConfig中找到
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //这里指定要使用的分区策略
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"cn.shangxiaoying.kafka.partitioner.MyPartitioner");
        //这里可以指定拦截器
        List<String> interceptorList = new ArrayList<>();
        interceptorList.add("cn.shangxiaoying.kafka.interceptors.TimeInterceptor");
        interceptorList.add("cn.shangxiaoying.kafka.interceptors.CountInterceptor");
        properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptorList);
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

        producer.send(new ProducerRecord<>("first", 0, "testInterceptor", "hello,Interceptor"));

        //该close方法也会调用拦截器的close
        producer.close();
}

集成SpringBoot

生产者

配置文件

application.properties

# 应用名称
spring.application.name=kafka-learn
# 指定 kafka 的地址
spring.kafka.bootstrapservers=ip1:port, ip2:port
#指定 key 和 value 的序列化器
spring.kafka.producer.keyserializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.valueserializer=org.apache.kafka.common.serialization.StringSerializer
发送数据
@RestController
public class ProducerController {
  // Kafka 模板用来向 kafka 发送数据
  @Autowired
  KafkaTemplate<String, String> kafka;
  
  @RequestMapping("/send")
  public String send(String msg) {
    kafka.send("first", msg);
    return "ok";
  }
}

消费者

配置文件

application.properties

# 指定 kafka 的地址
spring.kafka.bootstrapservers=ip1:port, ip2:port
# 指定 key 和 value 的反序列化器
spring.kafka.consumer.keydeserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.valuedeserializer=org.apache.kafka.common.serialization.StringDeserializer
#指定消费者组的 group_id
spring.kafka.consumer.group-id=kafka-test
消费数据
@Configuration
public class KafkaConsumer {
  // 指定要监听的 topic
  @KafkaListener(topics = "first")
  public void consumeTopic(String msg) { // 参数: 收到的 value
  	System.out.println("收到的信息: " + msg);
  }
}

Q.E.D.


Read The Fucking Source Code!