loading...
消费者相关
Published in:2022-01-31 | category: RocketMQ
Words: 252 | Reading time: 1min | reading:

消费者注册(RocketMqApplication)

  1. 检查是否需要启用消费者
  2. 检查配置参数是否为空(IP地址,ConsumerGroup,Topic)
  3. 获取instanceName(mqType - consumerGroup - hosId)
  4. 注册消费者
    • 消费者线程最小数
    • 消费者线程最大数
    • broker心跳
    • Name Server间隔时间
    • 设置订阅(topic, tags)
    • 设置队列的最小偏移量
    • 注册Message监听者
    • 设置instanceName
    • 开始监听消息

监听消息(AbstractMessageListener)

  1. 处理tag,获取tag数组
  2. 调用RocketMqListener.ack(MessageExt message. String topic, String subExpression)
    • 获取相应的topic信息
    • 根据topic获取对应的Service处理类
    • 调用消费服务receive

消息消费(IMqConsumerService)

以检查为例

  • 获取消息的body

  • body转换为Message

  • 获取表名

  • 根据表名获取对应的tableType

  • 获取tag

  • 获取对应的处理器Service

  • 开始消费

    • execHanlderMessage

      根据处理业务(表名),进行不同的参数校验

      根据oper的操作类型,执行不同的操作

      update, save :

      根据old数据是否为空,判断是新增还是修改

      delete

Prev:
RocketMQ快速入门
Next:
XXL-JOB快速入门
catalog
catalog