说说 MQ 之 Kafka

来自:褚哥说

现代的互联网分布式系统,只要稍微大一些,就一定逃不开3类中间件:远程调用(RPC)框架、消息队列、数据库访问中间件。Kafka 是消息队列中间件的代表产品,用 Scala 语言实现,本文采用的是 Kafka_2.11 0.10.0.0 版本进行实验。

基本概念

首先,Kafka 中有一些基本的概念需要熟悉 1 2

  • Topic,指消息的类别,每个消息都必须有;

  • Producer,指消息的产生者,或者,消息的写端;

  • Consumer,指消息的消费者,或者,消息的读端;

  • Producer Group,指产生者组,组内的生产者产生同一类消息;

  • Consumer Group,指消费者组,组内的消费者消费同一类消息;

  • Broker,指消息服务器,Producer 产生的消息都是写到这里,Consumer 读消息也是从这里读;

  • Zookeeper,是 Kafka 的注册中心,Broker 和 Consumer 之间的协调器,包含状态信息、配置信息和一些 Topic 的信息;

  • Partition,指消息的水平分区,一个 Topic 可以有多个分区;

  • Replica,指消息的副本,为了提高可用性,将消息副本保存在其他 Broker 上。

特别说明,Broker 是指单个消息服务进程,一般情况下,Kafka 是集群运行的,Broker 只是集群中的一个服务进程,而非代指整个 Kafka 服务,可以简单将 Broker 理解成服务器(Server)。Kafka 引入的术语都比较常见,从字面上理解相对直观。Kafka 的大致结构图是这样:

Kafka 是 Pull 模式的消息队列,即 Consumer 连到消息队列服务上,主动请求新消息,如果要做到实时性,需要采用长轮询,Kafka 在0.8的时候已经支持长轮询模式。上图中 Consumer 的连接箭头方向可能会让读者误以为是 Push 模式,特此注明。更多关于 Kafka 设计的文章可以参考官方文档,或者一些比较好的博客文章 3

关于顺序和分区

Kafka 是一个力求保持消息顺序性的消息队列,但不是完全保证,其保证的是 Partition 级别的顺序性,如下图:

此图是 Topic 的分区 log 的示意图,可见,每个分区上的 log 都是一个有序的队列,所以,Kafka 是分区级别有序的。如果,某个 Topic 只有一个分区,那么这个 Topic 下的消息就都是有序的。

分区是为了提升消息处理的吞吐率而产生的,将一个 Topic 中的消息分成几份,分别给不同的 Broker 处理。如下图:

此图中有2个 Broker,Server 1 和 Server 2,每个 Broker 上有2个分区,总共4个分区,P0 ~ P3;有2个 Consumer Group,Consumer Group A 有2个 Consumer,Consumer Group B 有4个 Consumer。Kafka 的实现是,在稳定的情况下,维持固定的连接,每个 Consumer 稳定的消费其中某几个分区的消息,以上图举例,Consumer Group A 中的 C1 稳定消费 P0、P3,C2 稳定消费 P1、P2。这样的连接分配可能会导致消息消费的不均匀分布,但好处是比较容易保证顺序性。

维持完全的顺序性在分布式系统看来几乎是无意义的。因为,如果需要维持顺序性,那么就只能有一条线程阻塞的处理顺序消息,即,Producer -> MQ -> Consumer 必须线程上一一对应。这与分布式系统的初衷是相违背的。但是局部的有序性,是可以维持的。比如,有30000条消息,每3条之间有关联,1->2->3,4->5->6,……,但是全局范围来看,并不需要保证 1->4->7,可以 7->4->1 的顺序来执行,这样可以达到最大并行度10000,而这通常是现实中我们面对的情况。通常应用中,将有先后关系的消息发送到相同的分区上,即可解决大部分问题。

关于副本

副本是高可用 Kafka 集群的实现方式。假设集群中有3个 Broker,那么可以指定3个副本,这3个副本是对等的,对于某个 Topic 的分区来说,其中一个是 Leader,即主节点,另外2个副本是 Follower,即从节点,每个副本在一个 Broker 上。当 Leader 收到消息的时候,会将消息写一份到副本中,通常情况,只有 Leader 处于工作状态。在 Leader 发生故障宕机的时候,Follwer 会取代 Leader 继续传送消息,而不会发生消息丢失。Kafka 的副本是以分区为单位的,也就是说,即使是同一个 Topic,其不同分区的 Leader 节点也不同。甚至,Kafka 倾向于用不同的 Broker 来做分区的 Leader,因为这样能做到更好的负载均衡。

在副本间的消息同步,实际上是复制消息的 log,复制可以是同步复制,也可以是异步复制。同步复制是说,当 Leader 收到消息后,将消息写入从副本,只有在收到从副本写入成功的确认后才返回成功给 Producer;异步复制是说,Leader 将消息写入从副本,但是不等待从副本的成功确认,直接返回成功给 Producer。同步复制效率较低,但是消息不会丢;异步复制效率高,但是在 Broker 宕机的时候,可能会出现消息丢失。

关于丢消息和重复收到消息

任何一个 MQ 都需要处理丢消息和重复收到消息的,正常情况下,Kafka 可以保证:1. 不丢消息;2. 不重复发消息;3. 消息读且只读一次。当然这都是正常情况,极端情况,如 Broker 宕机,断电,这类情况下,Kafka 只能保证 1 或者 2,无法保证 3。

在有副本的情况下,Kafka 是可以保证消息不丢的,其前提是设置了同步复制,这也是 Kafka 的默认设置,但是可能出现重复发送消息,这个交给上层应用解决;在生产者中使用异步提交,可以保证不重复发送消息,但是有丢消息的可能,如果应用可以容忍,也可以接受。如果需要实现读且只读一次,就比较麻烦,需要更底层的 API 4

参考文章


Kafka 的工具和编程接口

Kafka 的工具

Kafka 提供的工具还是比较全的,bin/ 目录下的工具有以下一些,

1
2
3
4
5
6
7
bin/connect-distributed.sh     bin/kafka-consumer-offset-checker.sh     bin/kafka-replica-verification.sh   bin/kafka-verifiable-producer.sh
bin/connect-standalone.sh      bin/kafka-consumer-perf-test.sh          bin/kafka-run-class.sh              bin/zookeeper-security-migration.sh
bin/kafka-acls.sh              bin/kafka-mirror-maker.sh                bin/kafka-server-start.sh           bin/zookeeper-server-start.sh
bin/kafka-configs.sh           bin/kafka-preferred-replica-election.sh  bin/kafka-server-stop.sh            bin/zookeeper-server-stop.sh
bin/kafka-console-consumer.sh  bin/kafka-producer-perf-test.sh          bin/kafka-simple-consumer-shell.sh  bin/zookeeper-shell.sh
bin/kafka-console-producer.sh  bin/kafka-reassign-partitions.sh         bin/kafka-topics.sh
bin/kafka-consumer-groups.sh   bin/kafka-replay-log-producer.sh         bin/kafka-verifiable-consumer.sh

我常用的命令有以下几个,

1
2
3
4
5
6
7
bin/kafka-server-start.sh -daemon config/server.properties &
bin/kafka-topics.sh --describe --zookeeper 192.168.232.23:2181 --topic topic1
bin/kafka-topics.sh --list --zookeeper 192.168.232.23:2181
bin/kafka-topics.sh --delete --zookeeper 192.168.232.23:2181 --topic topic1
bin/kafka-topics.sh --create --zookeeper 192.168.232.23:2181 --replication-factor 3 --partitions 2 --topic topic1
bin/kafka-console-consumer.sh --zookeeper 192.168.232.23:2181 --topic topic1 --from-beginning
bin/kafka-console-producer.sh --broker-list 192.168.232.23:9092 --topic topic1

kafka-server-start.sh 是用于 Kafka 的 Broker 启动的,主要就一个参数 config/server.properties,该文件中的配置项待会再说.还有一个 -daemon 参数,这个是将 Kafka 放在后台用守护进程的方式运行,如果不加这个参数,Kafka 会在运行一段时间后自动退出,据说这个是 0.10.0.0 版本才有的问题 5kafka-topics.sh 是用于管理 Topic 的工具,我主要用的 --describe--list--delete--create 这4个功能,上述的例子基本是不言自明的,--replication-factor 3--partitions 2 这两个参数分别表示3个副本(含 Leader),和2个分区。kafka-console-consumer.sh 和 kafka-console-producer.sh 是生产者和消费者的简易终端工具,在调试的时候比较有用,我常用的是 kafka-console-consumer.sh。我没有用 Kafka 自带的 zookeeper,而是用的 zookeeper 官方的发布版本 3.4.8,端口是默认2181,与 Broker 在同一台机器上。

下面说一下 Broker 启动的配置文件 config/server.properties,我在默认配置的基础上,修改了以下一些,

1
2
3
4
broker.id=0
listeners=PLAINTEXT://192.168.232.23:9092
log.dirs=/tmp/kafka-logs
delete.topic.enable=true

broker.id 是 Kafka 集群中的 Broker ID,不可重复,我在多副本的实验中,将他们分别设置为0、1、2;listeners 是 Broker 监听的地址,默认是监听 localhost:9092,因为我不是单机实验,所以修改为本机局域网地址,当然,如果要监听所有地址的话,也可以设置为 0.0.0.0:9092,多副本实验中,将监听端口分别设置为 9092、9093、9094;log.dirs 是 Broker 的 log 的目录,多副本实验中,不同的 Broker 需要有不同的 log 目录;delete.topic.enable 设为 true 后,可以删除 Topic,并且连带 Topic 中的消息也一并删掉,否则,即使调用 kafka-topics.sh --delete 也无法删除 Topic,这是一个便利性的设置,对于开发环境可以,生产环境一定要设为 false(默认)。实验中发现, 如果有消费者在消费这个 Topic,那么也无法删除,还是比较安全的。

剩下的工具多数在文档中也有提到。如果看一下这些脚本的话,会发现多数脚本的写法都是一致的,先做一些参数的校验,最后运行 exec $base_dir/kafka-run-class.sh XXXXXXXXX "$@",可见,这些工具都是使用运行 Java Class 的方式调用的。

Kafka 的 Java API

在编程接口方面,官方提供了 Scala 和 Java 的接口,社区提供了更多的其他语言的接口,基本上,无论用什么语言开发,都能找到相应的 API。下面说一下 Java 的 API 接口。

生产者的 API 只有一种,相对比较简单,代码如下,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class SimpleProducerDemo {
    public static void main(String[] args){
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.232.23:9092,192.168.232.23:9093,192.168.232.23:9094");
        props.put("zookeeper.connect", "192.168.232.23:2181");
        props.put("client.id", "DemoProducer");
        props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<Integer, String> producer = new KafkaProducer<>(props);
        String topic = "topic1";
        Boolean isAsync = false;
        int messageNo = 1;
        while (true) {
            String messageStr = "Message_" + String.format("%05d",messageNo);
            long startTime = System.currentTimeMillis();
            if (isAsync) { // Send asynchronously
                producer.send(new ProducerRecord<>(topic,
                        messageNo,
                        messageStr), new DemoCallBack(startTime, messageNo, messageStr));
            } else { // Send synchronously
                try {
                    producer.send(new ProducerRecord<>(topic,
                            messageNo,
                            messageStr)).get();
                    System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
            try {
                Thread.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            ++messageNo;
        }
    }
}
class DemoCallBack implements Callback {
    private final long startTime;
    private final int key;
    private final String message;
    public DemoCallBack(long startTime, int key, String message) {
        this.startTime = startTime;
        this.key = key;
        this.message = message;
    }
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        long elapsedTime = System.currentTimeMillis() - startTime;
        if (metadata != null) {
            System.out.println(
                    "Send     message: (" + String.format("%05d",key) + ", " + message + ") at offset "+ metadata.offset() +
                            " to partition(" + metadata.partition() +
                            ") in " + elapsedTime + " ms");
        } else {
            exception.printStackTrace();
        }
    }
}

上例中使用了同步和异步发送两种方式。在多副本的情况下,如果要指定同步复制还是异步复制,可以使用 acks 参数,详细参考官方文档 Producer Configs 部分的内容;在多分区的情况下,如果要指定发送到哪个分区,可以使用 partitioner.class 参数,其值是一个实现了 org.apache.kafka.clients.producer.Partitioner 接口的类,用于根据不同的消息指定分区6。消费者的 API 有几种,比较新的 API 如下,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class SimpleConsumer {
    public static void main(String[] args){
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.232.23:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("topic1", "topic2", "topic3"));
        while (true) {
            ConsumerRecords<Integer, String> records = consumer.poll(100);
            for (ConsumerRecord<Integer, String> record : records) {
                System.out.println("Received message: (" + String.format("%05d", record.key()) + ", " + record.value() + ") at offset " + record.offset());
            }
        }
    }
}

消费者还有旧的 API,比如 Consumer 和 SimpleConsumer API,这些都可以从 Kafka 代码的 kafka-example 中找到,上述的两个例子也是改写自 kafka-example。使用新旧 API 在功能上都能满足消息收发的需要,但新 API 只依赖 kafka-clients,打包出来的 jar 包会小很多,以我的测试,新 API 的消费者 jar 包大约有 2M 左右,而旧 API 的消费者 jar 包接近 16M。

其实,Kafka 也提供了按分区订阅,可以一次订阅多个分区 TopicPartition[];也支持手动提交 offset,需要调用 consumer.commitSync

Kafka 似乎没有公开 Topic 创建以及修改的 API(至少我没有找到),如果生产者向 Broker 写入的 Topic 是一个新 Topic,那么 Broker 会创建这个 Topic。创建的过程中会使用默认参数,例如,分区个数,会使用 Broker 配置中的 num.partitions 参数(默认1);副本个数,会使用 default.replication.factor 参数。但是通常情况下,我们会需要创建自定义的 Topic,那官方的途径是使用 Kafka 的工具。也有一些非官方的途径 7,例如可以这样写,

1
2
3
4
5
6
7
8
9
10
11
12
String[] options = new String[]{
        "--create",
        "--zookeeper",
        "192.168.232.23:2181",
        "--partitions",
        "2",
        "--replication-factor",
        "3",
        "--topic",
        "topic1"
};
TopicCommand.main(options);

但是这样写有一个问题,在执行完 TopicCommand.main(options); 之后,系统会自动退出,原因是执行完指令之后,会调用 System.exit(exitCode); 系统直接退出。这样当然不行,我的办法是,把相关的执行代码挖出来,写一个 TopicUtils 类,如下,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import joptsimple.OptionSpecBuilder;
import kafka.admin.TopicCommand;
import kafka.admin.TopicCommand$;
import kafka.utils.ZkUtils;
import org.apache.kafka.common.security.JaasUtils;
import scala.runtime.Nothing$;
public class TopicUtils {
    // from: http://blog.csdn.net/changong28/article/details/39325079
    // from: http://www.cnblogs.com/davidwang456/p/4313784.html
    public static void createTopic(){
        String[] options = new String[]{
                "--create",
                "--zookeeper",
                KafkaProperties.ZOOKEEPER_URL,
                "--partitions",
                "2",
                "--replication-factor",
                "3",
                "--topic",
                KafkaProperties.TOPIC
        };
//        TopicCommand.main(options);
        oper(options);
    }
    public static void listTopic(){
        String[] options = new String[]{
                "--list",
                "--zookeeper",
                KafkaProperties.ZOOKEEPER_URL
        };
//        TopicCommand.main(options);
        oper(options);
    }
    public static void deleteTopic(){
        String[] options = new String[]{
                "--delete",
                "--zookeeper",
                KafkaProperties.ZOOKEEPER_URL,
                "--topic",
                KafkaProperties.TOPIC
        };
//        TopicCommand.main(options);
        oper(options);
    }
    public static void describeTopic(){
        String[] options = new String[]{
                "--describe",
                "--zookeeper",
                KafkaProperties.ZOOKEEPER_URL,
                "--topic",
                KafkaProperties.TOPIC
        };
//        TopicCommand.main(options);
        oper(options);
    }
    public static void main(String[] args){
        listTopic();
        createTopic();
        listTopic();
        describeTopic();
        deleteTopic();
        try {
            Thread.sleep(3*1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        listTopic();
    }
    /** copied & modified from kafka.admin.TopicCommand$.main
     *
     * @param args
     */
    public static void oper(String args[]){
        try {
        TopicCommand$ topicCommand$ = TopicCommand$.MODULE$;
        final TopicCommand.TopicCommandOptions opts = new TopicCommand.TopicCommandOptions(args);
        if(args.length == 0) {
            throw kafka.utils.CommandLineUtils$.MODULE$.printUsageAndDie(opts.parser(), "Create, delete, describe, or change a topic.");
        } else {
            int actions =0;
            OptionSpecBuilder[] optionSpecBuilders = {opts.createOpt(), opts.listOpt(), opts.alterOpt(), opts.describeOpt(), opts.deleteOpt()};
            for (OptionSpecBuilder temp:optionSpecBuilders){
                if (opts.options().has(temp)) {
                    actions++;
                }
            }
            if(actions != 1) {