Reid

vuePress-theme-reco Reid    2018 - 2024
Reid Reid

Choose mode

  • dark
  • auto
  • light
TimeLine
分类
  • 后端
  • AI
  • 英文翻译
  • 运维
标签
我的GitHub (opens new window)
author-avatar

Reid

16

文章

25

标签

TimeLine
分类
  • 后端
  • AI
  • 英文翻译
  • 运维
标签
我的GitHub (opens new window)
  • RabbitMQ延迟消费

    • 前言
      • 用到的技术
        • RabbitMQ
          • 基本介绍
          • 安装RabbitMQ插件
        • NestJS
          • 交换器配置
          • 消息推送
          • 消息消费
          • 测试
        • 总结

        RabbitMQ延迟消费

        vuePress-theme-reco Reid    2018 - 2024

        RabbitMQ延迟消费


        Reid 2023-09-05 后端 NestJS RabbitMQ

        # 前言

        日常开发中我们都有需要某个任务延迟执行的场景,比如说订单在一定时间内自动关闭,或者某个操作后进行相应的延迟检查...

        要达成任务的延迟执行有很多方法,今天主要用RabbitMQ,并以Nestjs为示例代码进行讲解。

        # 用到的技术

        • Docker
        • RabbitMQs
        • NestJS

        这些技术就不一一介绍了,不懂的自行搜索(或者问问神奇的ChatGPT)。

        # RabbitMQ

        # 基本介绍

        RabbitMQ的话我是用Docker进行安装的,安装的是rabbitmq:3.7.7-management。

        本篇文章主要通过延迟交换机去进行消息的延迟推送,这一块需要安装一个rabbitmq-delayed-message-exchange (opens new window)的插件,关于插件版本的话,要注意是否适合你的rabbitmq版本,比如说我安装的是3.7的,那就要找对应的版本,不然会报错:

        # 安装RabbitMQ插件

        1. 下载对应版本的.ez文件:

        2. 将下载好的文件拷贝到容器中:

        docker cp yourLocation/rabbitmq_delayed_message_exchange-3.8.0.ez yourCotainerId:/plugins
        

        当然,你可以通过Docker的volume将文件映射到容器中,我比较喜欢这种方式。

        1. 执行命令进入容器:
        docker run -it yourCotainerId /bin/bash
        
        1. 执行命令安装插件:
        rabbitmq-plugins enable rabbitmq_delayed_message_exchange
        

        没问题的话会输出这些信息,如果有报错的话这里的started 1 plugins就不会显示:

        1. 然后退出容器,重启一下容器。
        docker restart yourCotainerId
        

        # NestJS

        应用层面的话我们用@golevelup/nestjs-rabbitmq (opens new window)这个库来进行消息的发送与消费。

        # 交换器配置

        交换器的type为x-delayed-message则是使用延迟交换器,对应的消息也会被延迟消费,主要注意type和arguments里的{ 'x-delayed-type': 'direct' }记得填就行。

        const mqClient = RabbitMQModule.forRootAsync(RabbitMQModule, {
          imports: [ConfigModule],
          useFactory: (configService: ConfigService) => {
            const urlInfo = configService.get<any>('rabbitMQ.urlInfo');
            return {
              exchanges: [
                {
                  name: MqExchange.DIRECT,
                  type: 'direct',
                  options: {
                    durable: true,
                  },
                },
                {
                  name: MqExchange.DIRECT_DELAYED,
                  type: 'x-delayed-message',
                  options: {
                    durable: true,
                    arguments: { 'x-delayed-type': 'direct' },
                  },
                },
              ],
              uri: `${urlInfo.protocol}://${urlInfo.username}:${urlInfo.password}@${urlInfo.hostname}:${urlInfo.port}/${urlInfo.vhost}`,
              channels: {
                'channel-1': {
                  prefetchCount: 1,
                  default: true,
                },
              },
            };
          },
          inject: [ConfigService],
        });
        

        # 消息推送

        消息推送的话只要在publish函数的第4个参数里加上x-delay,单位是毫秒,这个就是对应的延迟时间。

        public async push2Queue(items) {
            const res = await this.amqpConnection.publish(
              MqExchange.DIRECT_DELAYED,
              'test',
              { message: 'hello' },
              {
                headers: {
                  'x-delay': 10000,
                },
              },
            );
          }
        

        # 消息消费

        消息消费的话没啥好说,对应的交换机、路由和队列配置好就行。

        @RabbitRPC({
            exchange: MqExchange.DIRECT_DELAYED,
            routingKey: 'test',
            queue: 'test3',
            queueOptions: {
              durable: true,
              exclusive: false,
              autoDelete: false,
            },
          })
          public async ackMessage(item) {
            console.log('delayed msg', item);
          }
        

        # 测试

        我们推送一条消息,在RabbitMQ的管理后台可以看到有一条延迟的消息:

        10秒后我们会收到一条消息:

        delayed msg { message: 'hello' }
        

        # 总结

        实现任务延迟消费的方式还有很多,比如说RabbitMQ还可以用死信队列去实现。方法总是有很多的,找到适合你们团队的就行。