RocketMQ延迟消息的实现方法


这篇文章主要介绍“RocketMQ延迟消息的实现方法”,在日常操作中,相信很多人在RocketMQ延迟消息的实现方法问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”RocketMQ延迟消息的实现方法”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!延时消息即消息发送后并不立即对消费者可见,而是在用户指定的时间投递给消费者。比如我们现在发送一条延时30秒的消息,消息发送后立即发送给服务器,但是服务器在30秒后才将该消息交给消费者。RocketMQ通过配置的延迟级别延迟消息投递到消费者,其中不同的延迟级别对应 香港云主机不同的延迟时间,可配置,默认的延迟级别有18种,分别是1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,支持时间单位 s 秒 m分钟 h小时 d天。源码 MessageStoreConfig.java 是定义如下:可以在brocker配置 messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,自定义其时间级别。前提:先启动消费者等待消息的发送,先发送消息,消费者启动需要时间,影响测试结果。查看结果:查看结果:查看其消息投递的核心方法org.apache.rocketmq.store.CommitLog.putMessage我们发现在通过putMessage 延迟消息就被放存放到了主题为 SCHEDULE_TOPIC_XXXX的commitlog中,消息的原有的主题和消息队列存入属性中,后面再通过定时的方式对这这些消息进行重新发送。ScheduleMessageService.start()启动会为每一个延迟队列创建一个调度任务每一个调度任务对应SCHEDULE_TOPIC_XXXX主题下的一个消息消费队列。定时任务的实现类DeliverDelayedMessageTimerTask,核心方法是executeOnTimeup图解:1、消息生产者发送消息,如果发送的消息DelayTimeLevel大于0,则改变消息主题为SCHEDULE_TOPIC_XXXX,消息的队列为DelayTimeLevel-12、消息经由Commitlog转发到消息队列SCHEDULE_TOPIC_XXXX的消费队列1。3、定时任务Timer每隔1秒根据上次拉取消息的偏移量从消费队列中取出所有消息。4、根据消息的物理偏移量和消息大小从Commitlog中拉取消息。(PS:消息存储章节中会重点讲解)5、根据消息的属性重新创建消息,并恢复原主题TopicTest、原消息队列ID,清除DelayTimeLevel属性存入Commitlog中。6、记录原主题TopicTest的消息队列的消息偏移量,供消费者索引检索消息进行消费。到此,关于“RocketMQ延迟消息的实现方法”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注开发云网站,小编会继续努力为大家带来更多实用的文章!

相关推荐: 笔记本电脑屏幕突然变暗的解决方法

这篇文章主要介绍笔记本电脑屏幕突然变暗的解决方法,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!(1)显 香港云主机卡问题:1、电源重新插过,并清洁金手指,看风扇是否正常。2、重新更新驱动,也可以下载驱动精灵。(2)连接线问题:重新插过电…

免责声明:本站发布的图片视频文字,以转载和分享为主,文章观点不代表本站立场,本站不承担相关法律责任;如果涉及侵权请联系邮箱:360163164@qq.com举报,并提供相关证据,经查实将立刻删除涉嫌侵权内容。

(0)
打赏 微信扫一扫 微信扫一扫
上一篇 08/02 16:06
下一篇 08/02 16:06

相关推荐