消费者注册(RocketMqApplication)
- 检查是否需要启用消费者
- 检查配置参数是否为空(IP地址,ConsumerGroup,Topic)
- 获取instanceName(mqType - consumerGroup - hosId)
- 注册消费者
- 消费者线程最小数
- 消费者线程最大数
- broker心跳
- Name Server间隔时间
- 设置订阅(topic, tags)
- 设置队列的最小偏移量
- 注册Message监听者
- 设置instanceName
- 开始监听消息
监听消息(AbstractMessageListener)
- 处理tag,获取tag数组
- 调用RocketMqListener.ack(MessageExt message. String topic, String subExpression)
- 获取相应的topic信息
- 根据topic获取对应的Service处理类
- 调用消费服务receive
消息消费(IMqConsumerService)
以检查为例
获取消息的body
body转换为Message
获取表名
根据表名获取对应的tableType
获取tag
获取对应的处理器Service
开始消费
execHanlderMessage
根据处理业务(表名),进行不同的参数校验
根据oper的操作类型,执行不同的操作
update, save :
根据old数据是否为空,判断是新增还是修改
delete