Spring Cloud Stream 整合 RabbitMQ 实现动态路由 key 消息分发实战教程

AI 概述
本文基于Spring Cloud Stream整合RabbitMQ,实现按消息类型动态路由分发邮件消息。生产者通过配置routingKeyExpression读取消息头type作为路由键;消费者定义两组通道与不同group队列,分别绑定register、task路由key。通过自定义消息通道、绑定路由键与分组配置,实现不同类型消息精准投递到对应消费队列,满足业务异步解耦与分类消费需求。
目录
文章目录隐藏
  1. 例子
  2. 总结

为解决公司不同操作后发送对应邮件的耗时问题,我们引入消息中间件,借助 Spring Cloud Stream 整合 RabbitMQ 实现消息路由。核心通过生产者配置 routingKeyExpression 绑定路由 key,消费者利用 group(对应 RabbitMQ 队列)和 bindingRoutingKey 筛选消息,搭配自定义通道区分业务,下文结合完整实操示例,详解实现流程与关键配置。

Spring Cloud Stream 整合 RabbitMQ 实现动态路由 key 消息分发实战教程
表示交换机根据路由 key 绑定了不同的队列

要达到这种效果,首先消费者肯定是可以根据路由 key 来决定消息是不是发送给自己的,对于生产者则需要用到routingKeyExpression 来决定往哪个路由 key 发送数据(大概是这个意思)。

然后就是 stream 中的 group 其实对应到 rabbitMQ 中就是队列的概念,所以我们这里设置两个不同的 group 来对应到不同的队列,区分开业务;

例子

这里我定义了两个服务对应消费者和生产者。

生产者

spring:
  application:
    name: producer
  cloud:
    stream:
      binders: # 绑定 MQ 服务信息(此处我们是 RabbitMQ)
        etpmsRabbitMQ: # 给 Binder 定义的名称,⽤于后⾯的关联
          type: rabbit # MQ 类型,如果是 Kafka 的话,此处配置 kafka
          environment: # MQ 环境配置(⽤户名、密码等)
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: admin
                password: xxxxxx
      bindings: # 关联整合通道和 binder 对象
        output: # output 是我们定义的通道名称,此处不能乱改
          destination: testExchange # 要使⽤的 Exchange 名称(消息队列主题名称)
          content-type: text/plain # application/json # 消息类型设置,⽐如 json
          binder: etpmsRabbitMQ # 关联 MQ 服务
      rabbit:
        bindings:
          output:
            producer:
              # 生产者配置 RabbitMq 的动态路由键
              routingKeyExpression: headers.type
package top.chenyt.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

/**
 * @author yantao.chen
 */
@Service
public class ProviderService {
    /**
     * 将 MessageChannel 的封装对象 Source 注⼊到这⾥使⽤
     */
    @Autowired
    private Source source;

    public void sendMessage(String content, String type) {
        // 向 mq 中发送消息(并不是直接操作 mq,应该操作的是 spring cloud stream)
        // 使⽤通道向外发出消息(指的是 Source⾥⾯的 output 通道)
        source.output().send(MessageBuilder.withPayload(content).setHeader("type",type).build());
    }
}
package top.chenyt;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;

/**
 * @ClassName etpms-parent
 * @Author Jinondo
 * @Date 2022/1/31 12:42
 */
@SpringBootApplication
@Slf4j
@EnableBinding({Source.class})
public class ProducerApplication {

    public static void main(String[] args)  {
        SpringApplication.run(ProducerApplication.class, args);
    }

}

主要是 yml 配置添加:routingKeyExpression: headers.type

发送消息的时候 setHeader 一下。

消费者

spring:
  application:
    name: consumer
  cloud:
    stream:
      binders: # 绑定 MQ 服务信息(此处我们是 RabbitMQ)
        etpmsRabbitMQ: # 给 Binder 定义的名称,⽤于后⾯的关联
          type: rabbit # MQ 类型,如果是 Kafka 的话,此处配置 kafka
          environment: # MQ 环境配置(⽤户名、密码等)
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: admin
                password: xxxxx
      bindings: # 关联整合通道和 binder 对象
        input: # input 是我们定义的通道名称,此处不能乱改
          destination: testExchange # 要使⽤的 Exchange 名称(消息队列主题名称)
          content-type: text/plain # application/json # 消息类型设置,⽐如 json,自动将对象转为 json
          binder: etpmsRabbitMQ # 关联 MQ 服务
          group: register
        my-input:
          destination: testExchange # 要使⽤的 Exchange 名称(消息队列主题名称)
          content-type: text/plain # application/json # 消息类型设置,⽐如 json,自动将对象转为 json
          binder: etpmsRabbitMQ # 关联 MQ 服务
          group: task
      rabbit:
        bindings:
          my-input:
            consumer:
              bindingRoutingKey: task
          input:
            consumer:
              bindingRoutingKey: register

这里我就定义了两个通道,一个是默认的 input,一个是自定的。

package top.chenyt.consumer;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface MySink {

    String MY_INPUT = "my-input";

    @Input(MY_INPUT)
    SubscribableChannel myinput();

}
package top.chenyt.consumer;

import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;

/**
 * @ClassName etpms-parent
 * @Author Jinondo
 * @Date 2022/1/31 12:42
 */
@Service
public class ConsumerMsg {

    @StreamListener(Sink.INPUT)
    public void receiveMessages(Message<String> message) {
        System.out.println("========= input 接收到的消息:" + message.getPayload());
    }

    @StreamListener(MySink.MY_INPUT)
    public void receiveMessages02(Message<String> message) {
        System.out.println("========= myinput 接收到的消息:" + message.getPayload());
    }
}
package top.chenyt;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import top.chenyt.consumer.MySink;

/**
 * @ClassName etpms-parent
 * @Author Jinondo
 * @Date 2022/1/31 12:42
 */
@SpringBootApplication
@Slf4j
@EnableBinding({Sink.class, MySink.class})
public class ConsumerApplication {

    public static void main(String[] args)  {
        SpringApplication.run(ConsumerApplication.class, args);
    }

}

这样就能实现根据不同的消息类型对应到不同的队列且不同的路由 key 去了。

总结

本文通过生产者动态绑定路由 key、消费者配置多通道与对应队列(group),实现了不同消息类型精准路由至对应业务队列,高效解决邮件发送耗时问题。该方案依托 Spring Cloud Stream 与 RabbitMQ 的协同优势,配置简洁、业务区分清晰,可直接复用至类似多类型消息路由的实际业务场景。

以上关于Spring Cloud Stream 整合 RabbitMQ 实现动态路由 key 消息分发实战教程的文章就介绍到这了,更多相关内容请搜索码云笔记以前的文章或继续浏览下面的相关文章,希望大家以后多多支持码云笔记。

「点点赞赏,手留余香」

15

给作者打赏,鼓励TA抓紧创作!

微信微信 支付宝支付宝

还没有人赞赏,快来当第一个赞赏的人吧!

声明:本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若内容造成侵权/违法违规/事实不符,请将相关资料发送至 admin@mybj123.com 进行投诉反馈,一经查实,立即处理!
重要:如软件存在付费、会员、充值等,均属软件开发者或所属公司行为,与本站无关,网友需自行判断
码云笔记 » Spring Cloud Stream 整合 RabbitMQ 实现动态路由 key 消息分发实战教程

发表回复