kafka-消息队列

引言

提到缓存一般想到的是用 redis 和 memcache,一提到消息队列一般想到的是 kafka。尤其担心自己太片面了,见识跟不上了,毕竟几年前就在用 kafka 在日志和记录方面,就搜了一下发现现在主流的还是 kafka,还有比较新的 RocketMQ(2016年阿里捐献给 Apache)。ActiveMQ 和 RabbitMQ 一般不考虑,毕竟在吞吐量和可靠性不如前面 kafka。

详解

kafka 的使用很简单,部署好之后各个语言都有 client。以 go 举例。
kafka-go 发消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
conn, err := kafka.DialLeader(s.ctx, "tcp", "localhost:9092", s.topic, s.partition)
if err != nil {
return err
}
defer func() {
if err := conn.Close(); err != nil {
log.Println(err)
}
}()
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))

_, err = conn.WriteMessages(kafka.Message{Value: message})
if err != nil {
return err
}

收消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
GroupID: "consumer-group-id",
Topic: defaultTopic,
MinBytes: 10, // 1KB
MaxBytes: 10e6, // 10MB
})
defer r.Close()
for {
currentTime := time.Now()
m, err := r.FetchMessage(s.ctx)
// m, err := r.ReadMessage(context.Background())
if err != nil {
log.Println(err)
// return nil, err
}
message := &model.Message{}
err = proto.Unmarshal(m.Value, message)
if err != nil {
return nil, err
}
if err := r.CommitMessages(s.ctx, m); err != nil {
log.Println("failed to commit messages:", err)
}
log.Println("consume message duration: ", time.Now().Sub(currentTime).Milliseconds())
log.Println(message)
fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
}

注意点:

  1. 1个 topic 可以由多个 partition,但是一个 partition 只能由一个 consumer 消费。
  2. 1个 consumer 可以订阅多个 topic。会按照一定规则分配。例如 range 尽量平均分配,有余数就从第一个开始多承担一个。
  3. 生产环境肯定要搭建多个 brokers,
  4. 可以通过 kafka-topics --zookeeper localhost:2181 --create --topic kf-test --partitions 4 --replication-factor 3 来创建主题, partitions 一般一个有几兆的吞吐量,结合实际。1个 topic 不要创建太多的 partitions,控制在最多几百。一个 brokers partitions 控制在2000以内。一个集群 partitions 控制在 20000 以内。因为 partitions 太多,当一个 brokers 崩溃后会进行大量的选举。

不足:

  1. 自身不支持消息重试机制。在消息处理失败重试的情况下,只能自己想办法重试。
  2. 消费者 Rebalance 机制,可能会重复消费和影响性能。在消费者崩溃或者主动减少增加、分区增加和topic 订阅变化三种情况下会触发 Rebalance,后2者业务考虑,无法避免,只能尽量减少。但是消费者心跳失联和消费超时的异常情况是需要避免的,通过参数来达到预期效果。具体来看消息心跳最大是超时时间的1/3,消息的消费时长*拉取的消息数量必须远小于设置的拉取消息间隔时间,且该间隔时间小于超时时间。

总结

分布式系统有存在 CAP 特性,即一致性、可用性、分区容错不能同时满足,只能同时满足2个。一般分布式系统都是满足高可用性和分区容错,一致性只能尽量保证最终一致性。kafka 的高吞吐量在应用场景很广泛,因为其分区对象文件的简单架构来确保高的性能。但是想基于它搭建一个完整的消息系统还需要解决不少问题尤其在对消息重试机制上需要结合具体业务分析,一般的可能先消费者自己重试几次,仍失败就发布到重试 topic 单独处理恢复来确保最终的一致性。