3

RabbitMQ Bridge后台系统开发

 2 years ago
source link: https://blog.51cto.com/harmonyos/5688384
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

前面几篇文章已经简单写了关于RabbitMQ安装,使用,结合SpringBoot使用流程,有了前面的基础知识了,我们现在开始开发一个完整,可以直接使用到生产上的MQBridge后台系统,创建SpringBoot项目,这里就不详细说了,主要讲解MQBridge项目的开发过程,我画了一个流程图,整体说明MQBridge的功能和流程。

RabbitMQ Bridge后台系统开发_SpringBoot

第一步项 目依赖

项目依赖哪些第三方包,项目结构说明:

RabbitMQ Bridge后台系统开发_RabbitMQ_02
<dependencies>
        <!--rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

第二步 公共实体类和工具类

参数DefaultMessage说明:

@Data
@ToString
public class DefaultMessage implements Serializable {
    // 标识不同功能模块, 如: 订单、物流
    private String source;
    // 标识不同路由信息,如: 订单,物流
    private String action;
    // 参数
    private Map<String, Object> data;
}

返回类StandardResponse说明

@Data
public class StandardResponse<T> implements Serializable {
    // 状态码
    private String statusCode;
    // 状态描述
    private String statusDesc;
    // 响应结果
    private Boolean success;
    // 返回内容
    private T data;
}

第三步 发送Controller

  1. 对外提供发送消息Controller:
@RestController
@Slf4j
public class BridgeController {
    @Autowired
    BridgeService bridgeService;

    @PostMapping("/sendMessage")
    public StandardResponse sendMessage(@RequestBody DefaultMessage message) {
        log.info("[sendMessage] params: " + message);
        return bridgeService.sendMessage(message);
    }
}

2. MQBridge服务接口与实现类:

/**
 * Bridge桥接发送消息接口
 */
public interface BridgeService {
    // MQ桥接发送消息
    StandardResponse sendMessage(DefaultMessage message);
}
@Slf4j
@Service
public class BridgeServiceImpl implements BridgeService {
    /**
     * 缓存RabbitMQ消息生产者
     */
    @Autowired
    private Map<String, MessageSender> senderMap;

    @Override
    public StandardResponse sendMessage(DefaultMessage message) {
        log.info("[sendMessage] params: " + message);
        try {
            // 根据不同source调用不同的消息生产者
            return senderMap.get(message.getSource()).send(message);
        }catch (Exception e) {
            e.printStackTrace();
            log.info("[sendMessage] Error: " + e.getMessage());
            StandardResponse standardResponse = new StandardResponse();
            standardResponse.setSuccess(false);
            standardResponse.setStatusCode("101");
            standardResponse.setStatusDesc("Not Find Service.");
            return standardResponse;
        }
    }
}

第四步 消息生产者接口与父类实现

/**
 * RabbitMQ消息生产接口
 */
public interface MessageSender {
    // 发送RabbitMQ消息
    StandardResponse send(DefaultMessage defaultMessage);
}
/**
 * RabbitMQ消息生产父类
 */
@Slf4j
public class BaseMessageSender implements MessageSender {
    @Autowired
    RabbitTemplate rabbitTemplate;
    // 子类注入交换机名称
    protected String exchange;

    @Override
    public StandardResponse send(DefaultMessage defaultMessage) {
        StandardResponse standardResponse = new StandardResponse();
        try {
            log.info("{} Sender...",defaultMessage.getAction());
            // 根据参数Action,发送消息(交换机,路由主键, 参数)
            rabbitTemplate.convertAndSend(exchange, defaultMessage.getAction(), defaultMessage);

            standardResponse.setSuccess(true);
            standardResponse.setStatusCode("100");
            standardResponse.setStatusDesc("Send Success");
            return standardResponse;
        }catch (Exception e) {
            e.printStackTrace();
            log.error("convertAndSend Error: " + e.getMessage());
            standardResponse.setSuccess(false);
            standardResponse.setStatusCode("102");
            standardResponse.setStatusDesc("Send Error.");
            return standardResponse;
        }
    }
}

消息消费者接口:

/**
 * RabbitMQ消息消费接口
 */
public interface MessageHandler {
    // 处理RabbitMQ消息
    StandardResponse handle(DefaultMessage message);
}

第五步 订单消息

订单交换机、队列、路由主键之间关系使用:

1. 创建一个订单交换机


/**
 * 订单交换机
 */
@Slf4j
@Configuration
public class OrderTopicRabbitConfig {
    // 通过application.yml资源文件定义订单交换机名称,并引入
    @Value("${rabbitmq.exchange.order}")
    private String orderExchange;

    // 创建订单交换机,在生产者,消费者使用
    @Bean(name = "${rabbitmq.exchange.order}")
    public TopicExchange OrderTopicExchange() {
        log.info("[MQBridge.Order]主题交换机{}", orderExchange);
        return new TopicExchange(orderExchange,true,false);
    }
}

2. 订单消息生产者类,实例消息生产者类(测试使用)

/**
 * 订单消息生产者
 */
@Slf4j
@Service("MQBridge.Order")
public class OrderSender extends BaseMessageSender {
    // 通过application.yml资源文件定义订单交换机名称,并引入
    @Value("${rabbitmq.exchange.order}")
    private String orderExchange;

    /**
     * 服务启动时,把订单交换机名称注入到父类变量
     */
    @PostConstruct
    public void init() {
        this.exchange = orderExchange;
    }
}
/**
 * 实例消息生产者
 */
@Slf4j
@Service("MQBridge.Sample")
public class SampleSender extends BaseMessageSender {

    // 通过application.yml资源文件定义订单交换机名称,并引入
    @Value("${rabbitmq.exchange.order}")
    private String orderExchange;

    /**
     * 服务启动时,把订单交换机名称注入到父类变量
     */
    @PostConstruct
    public void init() {
        this.exchange = orderExchange;
    }

}

3. 订单Email消费者,订单公共消费者(此消费在这里说明通配符交换机其中一使用)

/**
 * 订单消息消费者
 */
@Slf4j
@Service
@RabbitListener(queues = "MQBridge_Order_Email")
public class OrderEmailHandler implements MessageHandler {
    // 定义订单队列名
    private static final String QUEUE_NAME = "MQBridge_Order_Email";
    // 创建订单队列对象
    @Bean(QUEUE_NAME)
    public Queue OrderTopicQueueEmail() {
        log.info("Order主题队列名{}", QUEUE_NAME);
        return new Queue(QUEUE_NAME,true);
    }
    // 订单队列对象与订单交换机绑定,路由主键为MQBridge.Order.Email
    @Bean
    Binding bindingOrderTopicEmail(@Qualifier(QUEUE_NAME) Queue queue,
                                   @Qualifier("${rabbitmq.exchange.order}") TopicExchange exchange) {
        log.info("Order主题队列绑定到上交换机,队列为{}", QUEUE_NAME);
        return BindingBuilder.bind(queue).to(exchange).with("MQBridge.Order.Email");
    }

    /**
     * 消费订单Email通知消息
     * @param message
     * @return
     */
    @RabbitHandler
    @Override
    public StandardResponse handle(DefaultMessage message) {
        log.info("[OrderEmailHandler] Queue: {}, RoutingKey: {}", QUEUE_NAME, "MQBridge.Order.Email");
        log.info("[MQBridge_Order]-[MQBridge.Order.Email]消费参数:{}", message);
        return null;
    }
}
/**
 * 订单消息消费者
 */
@Slf4j
@Service
@RabbitListener(queues = "MQBridge_Order_Common")
public class OrderCommonlHandler implements MessageHandler {
    // 定义订单队列名
    private static final String QUEUE_NAME = "MQBridge_Order_Common";
    // 创建订单队列对象
    @Bean(QUEUE_NAME)
    public Queue OrderTopicQueueCommon() {
        log.info("Order主题队列名{}", QUEUE_NAME);
        return new Queue(QUEUE_NAME,true);
    }
    // 订单队列对象与订单交换机绑定,路由主键为MQBridge.Order.Email
    @Bean
    Binding bindingOrderTopicCommon(@Qualifier(QUEUE_NAME) Queue queue,
                                   @Qualifier("${rabbitmq.exchange.order}") TopicExchange exchange) {
        log.info("Order主题队列绑定到上交换机,队列为{}", QUEUE_NAME);
        return BindingBuilder.bind(queue).to(exchange).with("MQBridge.Order.#");
    }

    /**
     * 消费订单公共通知消息
     * @param message
     * @return
     */
    @RabbitHandler
    @Override
    public StandardResponse handle(DefaultMessage message) {
        log.info("[OrderCommonlHandler] Queue: {}, RoutingKey: {}", QUEUE_NAME, "MQBridge.Order.#");
        log.info("[MQBridge_Order]-[MQBridge.Order.#]消费参数:{}", message);
        return null;
    }
}

第六步 物流消息

物流交换机、队列、路由主键之间关系使用:

1. 创建一个物流交换机

/**
 * 物流交换机
 */
@Slf4j
@Configuration
public class LogisticsTopicRabbitConfig {
    // 通过application.yml资源文件定义物流交换机名称,并引入
    @Value("${rabbitmq.exchange.logistics}")
    private String logisticsExchange;

    // 创建物流交换机,在生产者,消费者使用
    @Bean(name = "${rabbitmq.exchange.logistics}")
    public TopicExchange LogisticsTopicExchange() {
        log.info("[MQBridge.Logistics]主题交换机{}", logisticsExchange);
        return new TopicExchange(logisticsExchange,true,false);
    }
}

2. 物流消息生产者:

/**
 * 物流消息生产者
 */
@Slf4j
@Service("MQBridge.Logistics")
public class LogisticsSender extends BaseMessageSender {
    // 通过application.yml资源文件定义物流交换机名称,并引入
    @Value("${rabbitmq.exchange.logistics}")
    private String logisticsExchange;

    /**
     * 服务启动时,把物流交换机名称注入到父类变量
     */
    @PostConstruct
    public void init() {
        this.exchange = logisticsExchange;
    }
}

3. 物流消息消费者:

/**
 * 物流消息消费者
 */
@Slf4j
@Service
@RabbitListener(queues = "MQBridge_Logistics_Email")
public class LogisticsEmailHandler implements MessageHandler {
    // 定义物流队列名
    private static final String QUEUE_NAME = "MQBridge_Logistics_Email";
    // 创建物流队列对象
    @Bean(QUEUE_NAME)
    public Queue LogisticsTopicQueue() {
        log.info("Logistics主题队列名{}", QUEUE_NAME);
        return new Queue(QUEUE_NAME,true);
    }
    // 物流队列对象与物流交换机绑定,路由主键为MQBridge.Logistics.Email
    @Bean
    Binding bindingLogisticsTopic(@Qualifier(QUEUE_NAME) Queue queue,
                                   @Qualifier("${rabbitmq.exchange.logistics}") TopicExchange exchange) {
        log.info("Logistics主题队列绑定到上交换机,队列为{}", QUEUE_NAME);
        return BindingBuilder.bind(queue).to(exchange).with("MQBridge.Logistics.#");
    }

    /**
     * 消费物流Email通知消息
     * @param message
     * @return
     */
    @RabbitHandler
    @Override
    public StandardResponse handle(DefaultMessage message) {
        log.info("[LogisticsEmailHandler] Queue: {}, RoutingKey: {}", QUEUE_NAME, "MQBridge.Logistics.#");
        log.info("[MQBridge_Logistics_Email]消费参数:{}", message);
        return null;
    }
}

第七步 小结

从项目结构和代码可以看出,第五步和第六步是不同的业务功能,我们通过传递参数Source不同,调用不同的业务逻辑功能,之后如果添加新的模块,就可以参考第五步或第六步就可以实现,然后调用时,参数Source指定为新值就可以;这里再说一下Source参数值与Service是怎么对应的,在LogisticsSender类,OrderSender类,SampleSender类上都有一个注解@Service(“MQBridge.Order”), 括号里的字符串就是对应参数的Source值了。

第八步  调试

1.  Source为MQBridge.Order调用

RabbitMQ Bridge后台系统开发_SpringBoot_03

RabbitMQ Bridge后台系统开发_RabbitMQ_04

2.  Source为MQBridge.Sample调用 

RabbitMQ Bridge后台系统开发_SpringBoot_05

RabbitMQ Bridge后台系统开发_RabbitMQ_06

3.  Source为MQBridge.Logistics调用 

RabbitMQ Bridge后台系统开发_RabbitMQ_07

RabbitMQ Bridge后台系统开发_SpringBoot_08

从调试和打印日志可以看出MQBridge项目,可以很方便添加新功能,下图是调试的三次日志。

RabbitMQ Bridge后台系统开发_RabbitMQ_09

 想了解更多关于开源的内容,请访问:

 51CTO 开源基础软件社区

 https://ost.51cto.com/#bkwz


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK