RocketMQ 理解

整体架构

Untitled

  • NameServer:名字服务,是一个非常简单的 Topic 路由注册中心,支持 Broker 的动态注册与发现,Producer 和 Consumer 通过 NameServer 动态感知 Broker 的路由信息。
  • Broker:Broker 主要负责消息的存储、转发和查询。
  • Producer:消息生产者,支持分布式集群方式部署。
  • Consumer:消息消费者,支持分布式集群方式部署。

Broker

1. commitLog

Untitled

Untitled

单 Broker 实例下所有Queue共用一个 commitLog 目录保存所有消息,消息文件保存默认大小1G,文件名为起始偏移量。

  1. 方便顺序写
  2. 方便定位消息

2. consumequeue & indexfile

Untitled

Untitled

  1. Broker 端的后台服务线程会不停地分发请求并异步构建 consumequeue(消费文件)和 indexfile(索引文件)
  2. 消费文件按 topic 存储,每个主题下又按队列存储,每个文件也是顺序写入

3. 消费进度 consumerOffset

Untitled

Untitled

消息过滤

允许上传 Java 类或表达式在 broker 侧过滤消息

优点:方便过滤消息,以 Broker 的 CPU 资源换取网卡资源

风险:代码安全问题


Consumer

队列分配

Untitled

  1. 同一个消费组内的消费者不能消费同一个队列,默认一个组内消费者平均分配一个topic所有queue
  2. 不同消费组内的消费者可以消费同一个队列
  3. 消费者实例大于队列数量会导致多出来的消费者分配不到队列

重平衡

原理:

管理消费者分配队列的数量,去中心化,根据 nameSrv 可以拉到所有组内消费者实例 ID,默认分配策略是平均分配(策略还有:环形平均、机房就近分配、一致性hash等),通过队列ID和消费者ID排序后,以类似分页的形式分配给各消费者,每个消费者负责不同页。

时机:

  1. 第一次启动
  2. 有消费者下线
  3. 定时任务20s一次(兜底)

消费流程

  1. 消费者启动后,会通过负载均衡分配对应的队列,然后向 Broker 发送拉取消息请求。Broker 收到消费者拉取请求之后,根据订阅组,消费者编号,主题,队列名,逻辑偏移量等参数 ,从该主题下的 consumequeue 文件查询消息消费条目,然后从 commitlog 文件中获取消息实体。消费者在收到消息数据之后,执行消费监听器,消费完消息;
  2. 消费者将消费进度提交到 Broker ,Broker 会将该消费组的消费进度存储在进度文件里。
  3. 消费采用长轮询的方式减少因为 Pull 模式产生的消费延时。消费端请求拉取消息后,当Broker侧没有新消息不会立即返回,而是会等到有新消息才会返回。
  4. 消费侧通过异步回调的模式处理响应。
  5. 更新本地消费进度。消费者消费完一批消息后,需要保存消费进度到本地内存。流程是从本地消息队列中移除已消费的消息,保存剩余消息的最小 offset 作为消费进度。
    例如:1001 1002 1003 1004 当 1001 没被消费,哪怕1002-1004都被消费了,但是进度仍然会保存 1001。此时若 Consumer 宕机,会导致消费重复被消费,所以需要消费幂等。

异常消息

  1. 发送回 Broker 端的重试队列( RocketMQ 会为每个 topic 创建一个重试队列,以 %RETRY% 开头),达到重试时间后将消息投递到重试队列中进行消费重试。(延时消费)
  2. 发送回 Broker 失败,本地定时任务重试。
  3. 针对消息长时间无法被消费,消费进度无法更新的情况。
    1. 消费者通过消费进度过慢会进行流控,避免消费者无限循环拉取新消息。
    2. 消费者定期扫描所有消费的消息,若当前时间减去开始消费的时间大于消费超时,首先会将过期消息发送 sendMessageBack 命令发送到 Broker 进行延时消费 ,然后从快照中删除该消息。

Producer

发送流程


高性能优化

1. 网络IO – Netty

2. 磁盘IO – 顺序写,mmap 零拷贝,PageCache,异步刷盘


对比

1. Kafka

2. Pulsar


Feat

1. 延时/ 定时消息

RocketMQ 4.X

48aca802a74f436d872a905ff524c31c (1).png

仅支持固定时间,不支持任意时间延时,存储在 SCHEDULE_TOPIC_XXXX topic下底层是一个 ScheduledThreadPoolExecutor ,针对每个固定时间,维护定时任务。

Untitled

默认18 个 level 的延迟消息,通过 broker 端的 messageDelayLevel 配置项确定,如下:

Untitled

Broker 在启动时,内部会创建一个内部主题:SCHEDULE_TOPIC_XXXX,根据延迟 level 的个数,创建对应数量的队列,也就是说18个 level 对应了18个队列。

18 个队列启动18个 Timer,到达执行时间的任务会被保存到原 topic 下供消费者拉取。

RocketMQ 5.X

新增了时间轮 org.apache.rocketmq.store.timer.TimerWheel 用于支持任意秒级的定时消息,弥补了之前不够灵活的延时消息

Untitled

2. 顺序消息

发送端

问题:

  1. 如果发送是异步发送,此时有消息发送失败,例如发送 1 2 3,发送 2 失败重试,导致消息发送顺序是 1 3 2 ,产生乱序
  2. Broker 宕机,发送端重新路由,可能会发送到不同分区,这时有可能产生乱序

解法:

  1. 单线程同步发送
  2. 无解

消费端:

问题:

  1. 并行消费,产生乱序
  2. 消费组重平衡,消息在原分区已经被消费但还没提交 ack,重平衡后又被另外一个分区消费,产生乱序

解法:

  1. 单线程拉取,重新hash后分配给本地队列,队列维度并行处理(单 consumer 实例 + 多 worker 线程模型 作者:阿里技术Alitech https://www.bilibili.com/read/cv8757048/ 出处:bilibili)
  2. 消费前判断队列是否已经过期,并且向Broker请求对当前队列进行锁定,防止消费过程中被分配给其他消费者处理

发表评论

您的邮箱地址不会被公开。 必填项已用 * 标注

Scroll to Top