Kafka学习

概述

定义

Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域

消息队列

应用场景

  • 异步处理
  • 削峰

优势

  • 解耦

    允许你独立的扩展或修改两边的处理过程,只要确保他们遵守同样的接口约束

  • 可恢复性

    系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理

  • 缓冲

    有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的的处理速度不一致的情况

  • 灵活性&峰值处理能力

    • 灵活性:动态上下线,在流量高峰时可以动态增加服务器
    • 峰值处理:削峰,延迟处理

    使系统能够应对突发的高峰流量

  • 异步通信

    允许用户把一个消息放入队列,但不立即处理它,在需要的时候再去处理

模式

  1. 点对点模式

    一对一,消费者主动拉取数据,收到消息后,队列中的消息清除。队列可以有多个消费者,但对于一个消息而言,只能被一个消费者消费。

    消息队列点对点模式.png

  2. 发布/订阅模式

    • 一对多,消费者消费消息后,队列不会清除消息。消息生产者将消息发布到topic中,同时有多个消费者消费该消息。和点对点模式不同,发布到topic的消息会被所有订阅者消费

    • 发布/订阅模式中,又分为两种:

      • 消费者主动拉取消息

        Kafka就是属于这种类型

        优势:速度取决于消费者,可以根据消费能力以适当的速率消费消息

        弊端:需要轮询,查看队列中是否有消息,浪费资源

      • 队列推送消息

        类似于公众号推送

        弊端:

        推送消息的速度取决于队列,各个消费者处理消息的速度可能不一致,造成消费者崩掉(推送速度 >消费者处理速度)或者资源浪费(推送速度 < 消费者处理速度)

Kafka基础架构

架构图

Kafka架构图

zk在这里的作用:

  • 存储kafka集群信息
  • 存储消费者消费到的位置信息
    • 即:消费到第几条了
    • 消费者本地内存也会存储该条数信息,平时就是读取并维护本地的信息;但当机器挂掉重启后,会先去zk获取该信息,然后再在本地内存继续维护)
    • 0.9之后将位置信息存储到kafka里一个系统创建的topic中
      • 为何改存到kafka?
      • 因为消费者本身就需要维护与kafka的连接,去获取消息,如果将位置信息放在zk,则还需要请求zk获取信息,速度不如kafka(注:Kafka消息存到磁盘,默认存七天

名词解释

  • Broker:可以理解为起了Kafka进程的服务器
  • Topic:主题,可以理解为一个队列,用来将消息分类,便于发送和消费
  • Partition:分区,用来提高Kafka集群的并发处理能力,每一个partition是一个有序的队列(个人理解Partition就是对Topic的又一次细分,分布式的多个Broker是为了避免单个机器的性能造成的阻塞,多个partition是为了避免同一内存区域的IO阻塞)
  • Replication:副本。为保证集群中某个节点发生故障时,该节点上的partition数据不丢失,且kafka仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader和若干个follower
  • Leader:每个分区多个副本的”主“,生产者发送数据的对象,消费者消费数据的对象都是leader
  • Follower:
    • Leader的副本,每个分区多个副本的”从“,实时从leader同步数据,保持和leader数据的同步。用来备份数据(不直接对生产者和消费者提供读写),避免Leader所在机器挂掉后数据丢失。(所以,同一topic同一partition的Follower和Leader一定不在同一台机器)
    • leader发生故障时,某个Follower会成为新的leader
  • Consumer Group:消费者组,可以理解为一个大的消费者,目的是提高消费能力
    • 和其他普通消费者一样,需要订阅一个topic
    • 一个消费者组里面的不同消费者,只能消费kafka中这个topic的不同partition的数据。(个人理解:建立partition就是为了提高消息的读写速度,对于同一个topic,消息的写入根据partition区分开了,消费者消费的时候如果不分开,会降低消息消费速度,那partition的意义就不大了;而且如果多个消费者消费同一分区的话,对于这个消费者组而言,就会造成消息重复消费)

基本操作

安装

命令行操作

topic

查看该机器上所有topic:

kafka-topics.sh --list --zookeeper ip:zk端口

创建topic:

kafka-topics.sh --create --topic topic名称 --zookeeper ip:zk端口 --partitions 分区数 --replication-factor 副本数

#注:副本数不能大于当前可用的Broker数,分区数可以大于当前可用的Broker数
#副本数 包括 leader 和 follower

删除topic:

kafka-topics.sh --delete --topic first --zookeeper ip:zk端口

#注:执行效果:
#Topic first is marked for deletion.   标记为删除
#Note: This will have no impact if delete.topic.enable is not set to true.  只有当delete.topic.enable设为true时才会真正删除

查看topic详情:

kafka-topics.sh --describe --zookeeper ip:zk端口 --topic topic名称

消息

生产者发送消息

kafka-console-producer.sh --topic first --broker-list kafkaIP:kafka端口

消费者消费消息

kafka-console-consumer.sh --topic first --bootstrap-server kafkaIP:kafka端口 #从当前开始消费
#或者:
kafka-console-consumer.sh --topic first --bootstrap-server kafkaIP:kafka端口 --from-begining #从头开始消费

代码实操

生产者

需要用到的类:

  • KafkaProducer:需要创建一个生产者对象,用来发送数据
  • ProducerConfig:获取所需的一系列配置参数
  • ProducerRecord:每条数据都要封装成一个 ProducerRecord 对象
发送消息

消息发送流程

Kafka的Producer发送消息采用的是异步发送的方式。在消息发送过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。

注:异步发送,RecordAccumulator中来一个消息Sender就会发一个,发完之后如果还有消息就会接着发送,等待ack期间不会阻塞。如果没有收到ack,则重发消息。

图示:

相关参数:

batch.size:只有数据积累到batch.size之后,sender才会发送数据

linger.ms:如果数据迟迟未达到batch.size,sender等待linger.ms之后就会发送数据

/**
     * 简单发送
     */
    @Test
    public void testProducer() {

        KafkaProducer<String, String> producer = getKafkaProducer();
        producer.send(new ProducerRecord<>("first", "test", "hello kafka"));

        //关闭资源
        producer.close();
    }

    /**
     * 发送后触发回调函数
     */
    @Test
    public void testProducerWithCallBack() {

        KafkaProducer<String, String> producer = getKafkaProducer();
        producer.send(new ProducerRecord<>("first", "test", "hello kafka"), new Callback() {
            //回调方法,会在producer收到ack时触发
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if (Objects.isNull(e)) {
                    //没有异常
                    System.out.println(recordMetadata.toString());
                } else {
                    e.printStackTrace();
                }
            }
        });

        //关闭资源
        producer.close();
    }
  private KafkaProducer<String, String> getKafkaProducer() {
          //配置
          Properties properties = new Properties();
          properties.put("bootstrap.servers","shangxiaoying.cn:9092");
          properties.put("aacks", "all");
          //配置的key可以在ProducerConfig中找到
          properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
          properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
          //这里指定要使用的分区策略
          properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"cn.shangxiaoying.kafka.partitioner.MyPartitioner");

          KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
          return producer;
      }
自定义分区策略

编写类实现Partitioner接口,重写partition()方法

public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        // 自定义选择分区的逻辑
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

在生产者配置中指定分区策略

private KafkaProducer<String, String> getKafkaProducer() {
  //配置
  ...
  properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  //这里指定要使用的分区策略
  properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"cn.shangxiaoying.kafka.partitioner.MyPartitioner");
	KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
  return producer;
}
//发送消息
...

消费者

Consumer 消费数据时的可靠性是很容易保证的,因为数据在 Kafka 中是持久化的,故不用担心数据丢失问题。 由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。 所以 offset 的维护是 Consumer 消费数据时必须考虑的问题。

需要用到的类:

  • KafkaConsumer:需要创建一个消费者对象,用来消费数据
  • ConsumerConfig:获取所需的一系列配置参数
  • ConsuemrRecord:每条数据都要封装成一个 ConsumerRecord 对象

为了使我们能够专注于自己的业务逻辑,Kafka 提供了自动提交 offset 的功能。 自动提交 offset 的相关参数:

  • enable.auto.commit:是否开启自动提交 offset 功能
  • auto.commit.interval.ms:自动提交 offset 的时间间隔
自动提交offset
@Test
public void testConsumer() {
        KafkaConsumer<String, String> kafkaConsumer = getKafkaConsumer();
        //订阅topic,可以订阅多个
        kafkaConsumer.subscribe(Collections.singletonList("first"));
        while (true) {
            //拉取消息,参数为没有消息时的等待时间
            ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.toString());
            }
        }

}
private KafkaConsumer<String, String> getKafkaConsumer() {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "shangxiaoying.cn:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group_01");
        //开启自动提交(这里提交是指提交offset)
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        //这里是反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer(properties);
        return kafkaConsumer;
}
手动提交offset

虽然自动提交 offset 十分简介便利,但由于其是基于时间提交的,开发人员难以把握 offset 提交的时机。

因此 Kafka 还提供了手动提交 offset 的 API。 手动提交 offset 的方法有两种:分别是 commitSync(同步提交)和 commitAsync(异步 提交)。两者的相同点是,都会将本次 poll 的一批数据最高的偏移量提交;不同点是, commitSync 阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致, 也会出现提交失败);而 commitAsync 则没有失败重试机制,故有可能提交失败。

@Test
public void testConsumer() {
        KafkaConsumer<String, String> kafkaConsumer = getKafkaConsumer();
        //订阅topic,可以订阅多个
        kafkaConsumer.subscribe(Collections.singletonList("first"));
        while (true) {
            //拉取消息,参数为没有消息时的等待时间
            ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.toString());
            }

            //手动提交,不要忘记关掉自动提交
            //同步提交,当前线程会阻塞直到 offset 提交成功
//            kafkaConsumer.commitSync();
            //异步提交
            kafkaConsumer.commitAsync(new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
                    //业务逻辑
                }
            });
        }

}
private KafkaConsumer<String, String> getKafkaConsumer() {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "shangxiaoying.cn:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group_01");
        //关闭自动提交(这里提交是指提交offset)
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        //这里是反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer(properties);
        return kafkaConsumer;
}

数据漏消费和重复消费:

无论是同步提交还是异步提交 offset,都有可能会造成数据的漏消费或者重复消费。

  • 先提交 offset 后消费,有可能造成数据的漏消费;
    • 提交offset后,数据还没处理完,消费者挂掉,重启后,本地offset丢失,获取kafka存储的offset,这个offset是挂掉前拉取的最新数据的kafka,kafka以为消费者都消费完了,其实并未真正处理完毕,再拉取消息会按照kafka存储的offset拉取,故造成楼消费。
  • 而先消费后提交 offset,有可能会造成数据的重复消费
    • 消费者拉取到数据在处理,处理了一部分数据后消费者挂掉,此时offset并未提交,消费者重启后,本地offset已经丢失,所以会读取kafka存储的offset,这个offset是上次拉取消息之前的,所以又会将挂掉前拉取的消息再拉取一遍,造成重复。
自定义存储offset

要解决上述问题,关键是要保证offset存储和数据处理的一致性。我们可以将offset存入MySQL,使得消息处理和offset存储在同一个事务中,从而保证一致性。

但是:

offset 的维护是相当繁琐的,因为需要考虑到消费者的 Rebalance。 当有新的消费者加入消费者组、已有的消费者退出消费者组或者所订阅的主题的分区发生变化,就会触发到分区的重新分配,重新分配的过程叫做 Rebalance。 消费者发生 Rebalance 之后,每个消费者消费的分区就会发生变化。因此消费者要首先获取到自己被重新分配到的分区,并且定位到每个分区最近提交的 offset 位置继续消费。

如何感知到是否发生Rebalance?

  • Kafka有一个类:ConsumerRebalanceListener可以实现

思路:

  • 在Rebalance之前,每个分区要提交自己最新的offset
  • 在Rebalance之后,每个分区获取自己最新的offset,继续消费

实现:

private static Map<TopicPartition, Long> currentOffset = new HashMap<>();
/**
     * 自定义存储offset
     */
    @Test
    public void testCustomOffset() {
        KafkaConsumer<String, String> kafkaConsumer = getKafkaConsumer();
        //订阅topic,ConsumerRebalanceListener帮助我们实现自定义存储offset
        kafkaConsumer.subscribe(Collections.singletonList("first"), new ConsumerRebalanceListener() {

            //该方法会在 Rebalance 之前调用
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                //Rebalance之前,将每个分区的最新offset提交,这里可以自定义提交至MySQL
                commitOffset(currentOffset);
            }

            //该方法会在 Rebalance 之后调用
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                currentOffset.clear();
                for (TopicPartition partition : collection) {
                    kafkaConsumer.seek(partition, getOffset(partition));//定位到最近提交的 offset 位置继续消费
                }
            }
        });
        while (true) {
            //最佳实践:将数据处理和offset保存放在一个事务中,从而保证数据不会丢失或重复
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);//消费者拉取数据
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf(record.toString());
               //维护currentOffset
                currentOffset.put(new TopicPartition(record.topic(),
                        record.partition()), record.offset());
            }
            commitOffset(currentOffset);
        }
    }


    //获取某分区的最新 offset
    private static long getOffset(TopicPartition partition) {
        //业务逻辑,比如去MySQL中查询最新offset
        return 0;
    }
    //提交该消费者所有分区的 offset
    private static void commitOffset(Map<TopicPartition, Long> currentOffset) {
        //保存offset
    }

拦截器

原理

Producer 拦截器(interceptor)是在 Kafka 0.10 版本被引入的,主要用于实现 clients 端的定制化控制逻辑。

对于 producer而言,interceptor 使得用户在消息发送前以及 producer 回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer 允许用户指定多个 interceptor 按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor 的实现接口是 org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:

  • configure(configs)
    • 获取配置信息和初始化数据时调用。
  • onSend(ProducerRecord)
    • 该方法封装进 KafkaProducer.send 方法中,即它运行在用户主线程中。Producer 确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的 topic 和分区,否则会影响目标分区的计算
  • onAcknowledgement(RecordMetadata, Exception)
    • 该方法会在消息从 RecordAccumulator 成功发送到 Kafka Broker 之后,或者在发送过程中失败时调用。并且通常都是在 producer 回调逻辑触发之前。onAcknowledgement 运行在 producer 的 IO 线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢 producer 的消息发送效率。(为何会在IO线程中?)
  • close
    • 关闭 interceptor,主要用于执行一些资源清理工作
    • 如前所述,interceptor 可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。
    • 另外倘若指定了多个 interceptor,则 producer 将按照指定顺序调用它们,并仅仅是捕获每个 interceptor 可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中 要特别留意。
实操

实现一个简单的双 interceptor 组成的拦截链。第一个 interceptor 会在消息发送前将时间戳信息加到消息 value 的最前部;

第二个 interceptor 会在消息发送后更新成功发送消息数或失败发送消息数。

时间戳拦截器

public class TimeInterceptor implements ProducerInterceptor {
    @Override
    public ProducerRecord onSend(ProducerRecord producerRecord) {
        return new ProducerRecord(producerRecord.topic(),
                producerRecord.partition(),
                producerRecord.timestamp(),
                producerRecord.key(),
                System.currentTimeMillis() + "-" + producerRecord.value());
    }

    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {

    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

计数拦截器

public class CountInterceptor implements ProducerInterceptor {
    private Integer successCount = 0;
    private Integer errorCount = 0;
    @Override
    public ProducerRecord onSend(ProducerRecord producerRecord) {
        return producerRecord;
    }

    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        if (Objects.isNull(e)) {
            successCount ++;
        } else {
            errorCount ++;
        }
    }

    @Override
    public void close() {
        System.out.println("success: " + successCount + ", error: " + errorCount);
    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

使用拦截器

@Test
public void testInterceptorChain() {
        //配置
        Properties properties = new Properties();
        properties.put("bootstrap.servers","shangxiaoying.cn:9092");
        properties.put("aacks", "all");
        //配置的key可以在ProducerConfig中找到
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //这里指定要使用的分区策略
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"cn.shangxiaoying.kafka.partitioner.MyPartitioner");
        //这里可以指定拦截器
        List<String> interceptorList = new ArrayList<>();
        interceptorList.add("cn.shangxiaoying.kafka.interceptors.TimeInterceptor");
        interceptorList.add("cn.shangxiaoying.kafka.interceptors.CountInterceptor");
        properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptorList);
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

        producer.send(new ProducerRecord<>("first", 0, "testInterceptor", "hello,Interceptor"));

        //该close方法也会调用拦截器的close
        producer.close();
}

架构深入

工作流程

Kafka工作流程

说明:

  • 偏移量不是全局唯一的,是分区唯一的
  • kafka只保证消息的分区内有序,不保证全局有序
    • 有序是指消费消息的顺序和生产消息的顺序一致
  • kafka中消息是以topic进行分类的,生产者生产消息,消费者消费消息,都是面向topic的
  • topic是逻辑上的概念,而partition是物理上的概念
    • 每个partition对应一个log文件(所在的文件夹名 = topic - partition),该log文件存储的就是producer生产的数据。producer生产的数据会不断追加到该log文件末端,且每条数据都有自己的offset。
    • 消费者都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费。

文件存储机制

topic-partition关系

说明:

由于生产者生产的消息会不断追加到log文件末尾,log文件过大会导致数据定位效率低下,为防止该问题,kafka采取了分片索引机制:

  • 每个partition分为多个segment
  • 每个segment对应两个文件:
    • .index文件
    • .log文件
  • .index和.log文件位于同一文件夹下,该文件夹命名规则为:topic名称 + 分区序号,如first有三个分区,则对应文件夹名称为first-0, first-1, first -2
  • 每个segment的.index和.log文件文件名相同,都是当前segment中第一条消息的offset

.index文件和.log文件详解

消息读取机制

.index文件中存储:

  • 消息的偏移量(第几条消息)--左侧
  • 对应消息的存储地址偏移量 --右侧
  • 对应消息的大小

.log文件中存储:

  • 消息内容

注:.indx文件中存的内容是固定的,就是存消息偏移量、存储地址偏移量、消息大小等信息,所以在根据消息偏移量找对应消息时,可以直接采用 消息偏移量 * .index中单个内容的大小,快速找到要读取的消息信息地址,加快查询速度

消息读取机制

  1. 根据要读取的消息偏移量,采用二分查找,找到对应对的.index文件
  2. 在改.index文件中,根据消息偏移量,找到要读取的消息的存储地址偏移量和该消息大小
  3. 根据2中结果,去.log文件中直接读取 地址偏移量 到 地址偏移量 + 消息大小 这一段的数据

生产者

分区策略

分区原因
  1. 方便在集群中扩展。每个partition可以通过调整大小以适应它所在的机器,而一个topic又可以有多个partition组成,因此整个集群就可以适应任意大小的数据了。
  2. 提高并发。以partition为单位读写
分区原则

发送的数据要封装成一个ProduceRecord对象,该对象中有partition、key、value等属性

  • 指明partition的情况下,直接将指明的值作为partition值
  • 没有指明partition值,但有key值的情况下,将key的hash值与当前topic可用的partition总数进行取余得到partition值
  • 既没有指明partition值又没有key值的情况下,采用轮询的方式选取分区。但谁来做第一个分区?kafka采用的机制是:在第一次调用时随机生成一个整数,后面每次调用在这个整数上自增,将这个值与当前topic可用的partition总数取余得到partition值。(由于每次调用都会在这个整数上自增,所以取余后的结果也是自增或等于初始值,也就达到了轮询每个partition的效果)。这就是round-robin算法

源码:

//这个方法是默认的分区策略类里的,能进到这个方法,说明肯定没有指定partition
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  			//获取当前topic的partition数目
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        //没有指定key
        if (keyBytes == null) {
            int nextValue = this.nextValue(topic);
          //当前topic存活的partition数
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
              	//从存活的partition里选取一个partition返回
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return ((PartitionInfo)availablePartitions.get(part)).partition();
            } else {
                //选取一个partition返回
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
           //有key值,将key的hash值与当前topic可用的partition总数进行取余得到partition值
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

数据可靠性保证

概述

为保证producer发送的数据,能可靠的发送到指定的topic,topic的每个partition收到数据后,都需要向producer发送ack(acknowledgement确认收到),如果producer收到ack,就会进行下一轮的发送,否则重新发送数据。

何时发送ack?

方案比较
方案优点缺点备注
半数以上同步完成,就发送ack延迟低选举新leader时,如果要容忍n台节点的故障,则需要2n+1个副本这里的n台是指同步成功了的节点中,容忍n台节点故障,因为没有同步成功的节点肯定不能选举成为新leader。
如果要容忍n台节点故障,则同步成功的至少需要n+1台(保证至少存活一台同步成功的节点)
因为该方案是半数以上同步完成就发送ack,所以n+1占到了半数以上(总节点数/2 +1 = n + 1,总节点数 = 2n),所以需要2n个副本(视频上是2n + 1,不太明白)
全部完成同步,才发送ack选举新leader时,如果要容忍n台节点故障,则需要n+1个副本延迟高n台节点故障,至少要有一台活着能够完成选举,所以同步成功的要有n+1台。
因为该方案是全部完成同步才发送ack,所以总节点数 = 完成同步的节点数 = n +1。
Kafka方案

Kafka选取了第二种方案,原因:

  • Kafka每个分区都有大量数据,第一种方案会造成大量数据冗余
  • 网络延迟对Kafka影响较小(为啥)
第二种方案存在的问题

全部完成同步,才会发送ack,如果在同步数据时,有一个follower因为某种故障,迟迟不能与leader进行同步,那leader就要一直等下去,直到他完成同步,才能发送ack

解决方案

ISR

  • leader维护了一个动态的ISR(in-sync replica set),意为和leader保持同步的follower集合。当ISR中的follower完成数据的同步之后,leader就会给follower发送ack(不应该是follower给leader发送么)。如果follower长时间未向leader同步数据,则该follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定(默认10秒)。leader发生故障后,就会从ISR中选举新leader
  • 由于上述条件限制,所以ISR中的follower都是同步速度很快的follower,未来leader也会在ISR中选举,所以leader只要等ISR中的follower都同步完成,就可以向生产者发送ack

老版本中,follower进入ISR有两个条件:

  • follower中消息条数与leader中的消息条数差值要小于设定的参数阈值(默认10000条)
  • follower向leader同步数据的时间要小于设定的阈值

新版本中(0.9版本移除)移除了条数限制,原因:

生产者一般都是批量发送数据,假设条数阈值为10条,但生产者一次就发来了12条,这时leader中比所有follower都多12条数据,所有follower都会被移除ISR,但很快一些follower同步完成,又会把他们移入ISR,ISR存在内存中,这就会导致频繁操作内存。而且kafka会将ISR信息写入zookeeper,这也会导致kafka频繁请求zookeeper。

ack应答机制

对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR中的follower全部接收成功。因此,kafka为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下配置:

acks参数配置:

  • 0: 不等待。producer不等待Broker的ack,这一操作提供了一个最低的延迟,Broker一接收到还没有写入磁盘就已经返回,当Broker故障时有可能丢失数据
  • 1: 只等待leader。producer等待Broker的ack,leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据
  • -1(all): 等待leader和ISR中的follower。producer等待Broker的ack,leader和ISR中的follower全部落盘成功后才会返回ack。
    • 极限情况下也会造成数据丢失:比如当所有follower同步消息都比较慢时,此时ISR中只有leader自己,这种情快下当leader落盘成功,就等于ISR中全部落盘成功了,便会返回ack,在follower同步完成之前,若leader挂掉,则会导致数据丢失
    • 数据重复:follower同步完成之后,Broker发送ack之前,leader发生故障,producer没有收到ack便会重发消息,故造成数据重复
故障处理细节

LEO和HW:

LEO:指的是每个副本最大的offset

HW:指的是消费者能见到的最大的offset,ISR队列中最小的LEO(这里有点像MySQL版本控制)

follower故障:

follower发生故障后会被临时踢出ISR,待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步。等该follower的LEO大于等于该partition的HW,及follower追上leader之后,就可以重新加入ISR了

leader故障:

leader发生故障后,会从ISR中选出一个新的leader,之后,为保证多个副本之间的数据一致性,新leader会发出一个消息,其余的follower收到消息后会先将各自的log文件高于HW的部分截掉,然后从新的leader同步数据

注:

这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。数据不丢失或不重复还得看ack

相关语义

At Least Once:至少一次。即一条消息至少会存一次,数据不丢失,但可能会重复

  • 将服务器的ACK级别设置为-1,即可做到

At Most Once:至多一次。即一条消息最多存一次,数据不会重复,但可能会丢失

  • 将服务器的ACK级别设置为0,即可做到

Exactly Once:精准一次。即一条消息只存一次,数据不丢失也不重复

  • 一些重要数据,比如交易数据,下游数据消费者要求数据不重复也不丢失,这种情况就要达到该语义
  • 0.11版本之前的kafka无法实现,只能保证数据不丢失,然后在下游消费者对数据做全局去重。对于多个下游应用的情况,每个消费者都需要单独去做全局去重,这就对性能造成了很大影响
  • 0.11版本的kafka引入一项重大特性:幂等性。即producer不论向Server发送多少次重复数据,Server端都只会持久化一条。幂等性结合At Least Once语义就构成了kafka的Exactly Once语义,即:At Least Once + 幂等性 = Exactly Once
    • 要启用幂等性,只需要将producer的参数中enable.idompotence设置为true即可(此设置会自动将acks设为-1)。Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的producer在初始化的时候会被分配一个PID(producer Id,不是进程id),发往同一个partition的消息会附带 Sequence Number(消息的序列号)。而Broker端会对<PID,Partition,SeqNumber>做缓存,当具有相同键的消息提交时,Broker只会持久化一条
    • 但是生产者重启PID就会变化,同时,不同的partition也具有不同主键,所以幂等性无法保证跨分区跨会话的Exactly Once

消费者

消费方式

Consumer采用pull(拉)的方式从Broker中读取数据

kafka中没有数据时,pull模式可能会使消费者陷入空转。针对这一点,kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回(返回是指等待一段时间后,再查kafka有无数据么),这段时间即为timeout

分区分配策略

一个Consumer Group中有多个Consumer,一个Topic有多个Partition,所以必然会涉及到Partition的分配问题,即确定哪个Partition由哪个Consumer来消费

kafka有两种分配策略:

  • RoundRobin

    • 轮询

    • 按照消费者组来划分,将这个消费者组订阅的所有topic看做一个整体,然后轮询分给每一个消费。(将所有主题的分区组成 TopicAndPartition 列表,然后对 TopicAndPartition 列表按照 hashCode 进行排序,【其实就是按分区名hash排序后平均分配给每一个消费者】)

    • 优点:消费者负载均衡,最多相差一个分区

    • 缺点:

      • 当消费者组中的消费者订阅的是不同topic时,由于会将该消费者组订阅的所有topic看做一个整体,所以可能出现将一个topic的分区分配给未订阅该topic的消费者的情况,这样就出现了错误
      • 所以,该策略只适用于消费者组中的消费者都订阅同一个主题的情况
    • 例:

      假如按照 hashCode 排序完的topic-partitions组依次为T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,我们的消费者线程排序为C1-0, C1-1, C2-0, C2-1,最后分区分配的结果为:

      C1-0 将消费 T1-5, T1-2, T1-6 分区;
      C1-1 将消费 T1-3, T1-1, T1-9 分区;
      C2-0 将消费 T1-0, T1-4 分区;
      C2-1 将消费 T1-8, T1-7 分区;

  • Range

    • 按照范围

    • 默认策略

    • 首先对同一个主题里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。然后将partitions的个数除于消费者的总数来决定每个消费者消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区

    • 优点:消费者组中消费者可以订阅不同topic

    • 缺点:当一个消费者组中的消费者订阅了多个topic时,部分消费者会分得很多分区,不均衡

    • 例:

      假如有10个分区,3个消费者,把分区按照序号排列0,1,2,3,4,5,6,7,8,9;消费者为C1,C2,C3,那么用分区数除以消费者数来决定每个Consumer消费几个Partition,除不尽的前面几个消费者将会多消费一个
      最后分配结果如下:

      C1:0,1,2,3
      C2:4,5,6
      C3:7,8,9

      如果有11个分区将会是:

      C1:0,1,2,3
      C2:4,5,6,7
      C3:8,9,10

      假如我们有两个主题T1,T2,分别有10个分区,最后的分配结果将会是这样:

      C1:T1(0,1,2,3) T2(0,1,2,3)
      C2:T1(4,5,6) T2(4,5,6)
      C3:T1(7,8,9) T2(7,8,9)

当消费者组中消费者个数发生变更时,就会触发重新分配。即使消费者数目增加到大于分区数,也会重新分配

offset的维护

由于Consumer在消费过程中可能宕机,Consumer恢复后,需要从故障前的位置继续消费,所以Consumer需要实时记录自己消费到了哪个offset,以便故障后能够继续消费。

采用:消费者组 + topic + Partition 唯一确定一个offset

Kafka高效读写数据

原因:

  • 分布式,支持分区,可并行读写
  • 顺序写磁盘
    • kafka的producer生产的数据,要写入到log文件中,写的过程是一直追加到文件末端,为顺序写。这与磁盘的机械结构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。
  • 零复制技术
    • 也叫零拷贝技术

Zookeeper在Kafka中的作用

Kafka集群中会有一个Broker被选举为Controller,负责管理集群Broker的上下线,所有topic的分区副本分配和leader选举等工作

Controller的管理工作都是依赖于Zookeeper的

Partition的leader选举过程:

Kafka事务

Kafka从0.11版本开始引入了事务支持。事务可以保证Kafka在Exactly Once语义的基础上,生产和消费可以跨分区跨会话,要么全部成功,要么全部失败。

这个全部是指同一topic的全部消息么?

Producer事务

Producer事务保证了消息精准一次性的写入Kafka集群,在单分区幂等性的基础上加上了全局唯一id,使得幂等性达到跨分区跨会话级别。

为了实现跨分区跨会话的事务,需要引入一个全局唯一的TransactionID(TransactionID是Producer客户端给定的,所以即使Producer重启,该ID也不会变化),并将Producer获得的PID和TransantionID绑定。这样当Producer重启后就可以通过正在进行的TransactionID获得原来的PID。

为了管理Transaction,Kafka引入了一个新的组件Transaction Coordinator。Producer就是通过和Transaction Coordinator交互获得TransactionID对应的任务状态。Transaction Coordinator还负责将所有事物写入kafka的一个内部topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。

Consumer事务

对于Consumer而言,事务保证相对较弱,尤其是无法保证Commit的信息被精确消费。这是因为:Consumer可以通过offset访问任意信息,而且不同的Segment File生命周期不同,同一事务的消息可能会出现重启后被删除的情况。

Q.E.D.


Read The Fucking Source Code!