RocketMQ中如何实现push consumer消息拉取


这篇文章主要介绍RocketMQ中如何实现push consumer消息拉取,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!RebalanceImpl.updateProcessQueueTableInRebalance方法的末尾,对于每一个新生成的ProcessQueue都会创建一个PullRequest执行首次消息拉取操作。PullRequest会通过RebalanceImpl.dispatchPullRequest方法达到DefaultMQPushConsumerImpl.executePullRequestImmediately,然后被投递到PullMessageService的本地队列中。PulMessageService会启动一个服务线程,不断消费投递到本地队列中的PullRequest,最终调用到DefaultMQPushConsumerImpl.pullMessage方法。PullMessageService被MQClientInstance持有,同一个客户端实例中所有的push consumer产生的PullRequest都会被投 香港云主机递到同一个PullMessageService本地队列中排队等待执行。DefaultMQPushConsumerImpl.pullMessage是消息拉取的核心方法。该方法首先会执行一系列的限流判断,若命中限流条件则本次执行结束,等待一个固定时间之后会再次将同一个PullRequest投递到PullMessageService中重新触发消息拉取。DefaultMQPushConsumerImpl.pullMessage核心逻辑:pullMessage方法中创建的匿名PullCallback用来处理拉取到的消息列表:上面的第3步consumeMessageService.submitConsumeRequest中将根据并行或串行不同的方式将message提交给listener执行业务处理动作。消息拉取的整体流程如下:以上是“RocketMQ中如何实现push consumer消息拉取”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注开发云行业资讯频道!

相关推荐: linux中基础命令怎么用

这篇文章给大家分享的是有关linux中基础命令怎么用的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。xargs -l1 -P5 -i行,线程,-i命名等待主脚本内的所有进程跑完,才继续跑查看目录大小之du(使用了多少)du -sh 目…

免责声明:本站发布的图片视频文字,以转载和分享为主,文章观点不代表本站立场,本站不承担相关法律责任;如果涉及侵权请联系邮箱:360163164@qq.com举报,并提供相关证据,经查实将立刻删除涉嫌侵权内容。

(0)
打赏 微信扫一扫 微信扫一扫
上一篇 09/23 19:05
下一篇 09/23 19:05

相关推荐