5

Basic case of getting started with RabbitMQ

 2 years ago
source link: https://blog.birost.com/a?ID=00000-43855130-a975-4efc-b6b9-dca12b3d1a62
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Official website: www.rabbitmq.com/

Introduction

RabbitMQ is the most widely deployed open source message broker.

RabbitMQ has thousands of users and is one of the most popular open source message brokers. From T-Mobile  to Runtastic , RabbitMQ is used by small start-ups and large enterprises worldwide.

RabbitMQ is lightweight and easy to deploy internally and in the cloud. It supports multiple messaging protocols. RabbitMQ can be deployed in distributed and federated configurations to meet large-scale, high-availability requirements.

pom.xml dependency

<!--RabbitMQ--> < dependency > < groupId > org.springframework.boot </groupId > < artifactId > spring-boot-starter-amqp </artifactId > < version > 2.0.4.RELEASE </version > </dependency > copy code

application.yml

spring: rabbitmq: host: localhost port: 5672 username: guest password: guest Copy code

controller

import io.swagger.annotations.Api; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org .springframework.web.bind.annotation.RestController;

/** * @author tangcv * @date Created in 2020/3/4 14:59 * @Time : 14:59 * @description : Send messages from the message queue * @modified By: * @version : 1.0$ */ @Api(tags = "Message queue") @RestController @RequestMapping("/api/RabbitMQ") public class RabbitMQController {

@Autowired private DirectSender directSender;

@Autowired private TopicSender topicSender;

@Autowired private FanoutSender fanoutSender;

/** * Subscription model, publish and subscribe, exactly match * @return */ @GetMapping("/sendDirectQueue") public Object sendDirectQueue () { directSender.sendDirectQueue(); return "ok" ; }

/** * Theme mode, rule matching * @return */ @GetMapping("/sendTopic") public Object sendTopic () { topicSender.sendTopic(); return "ok" ; }

/** * Broadcast mode * The broadcast is sent once, no matter you receive it or not, it will disappear * @return */ @GetMapping("/sendFanout") public Object sendFanout () { fanoutSender.sendFanout(); return "ok" ; } } Copy code

Subscription model 

send

import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;

/** * Send * @author tangcv * @date Created in 2020/3/4 13:50 * @Time : 13:50 * @description : * @modified By: * @version : $ */ @Component @Slf4j public class DirectSender {

@Autowired private AmqpTemplate amqpTemplate;

public void sendDirectQueue () { log.info ( "[] sendDirectQueue sent message" ); //The first argument sent to which queue is simply more inside, the second parameter is simply more content transmitted the this .amqpTemplate.convertAndSend (DirectRabbitMQConfig.QUEUE, " DirectQueue message" ); }

} Copy code

Configuration

import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;

/** * If the routing key in the message is consistent with the binding key in Binding * The exchange sends the message to the corresponding queue. The routing key exactly matches the queue name */ @Configuration public class DirectRabbitMQConfig {

static final String QUEUE = "direct_queue" ; /** * Direct mode * @return */ @Bean public Queue directQueue () { //The first parameter is the name of the queue, and the second parameter is whether to persist return new Queue(QUEUE, true ); }

} Copy code

receive

import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;

/** * Receive * @author tangcv * @date Created in 2020/3/4 13:49 * @Time : 13:49 * @description : * @modified By: * @version : $ */ @Component @Slf4j public class DirectReceiver {

//queues refers to the name of the queue to be monitored @RabbitListener(queues = DirectRabbitMQConfig.QUEUE) public void receiverDirectQueue (String a) { log.info( "[receiverDirectQueue listened to the message]" + a); }

} Copy code

Theme mode

import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;

/** * @author tangcv * @date Created in 2020/3/4 14:22 * @Time : 14:22 * @description : * @modified By: * @version : $ */ @Component @Slf4j public class TopicSender {

@Autowired private AmqpTemplate amqpTemplate;

public void sendTopic () { log.info( "[TopicSender has sent messages]" ); //The first parameter: TopicExchange name //2.parameter: Route-Key //Third parameter: The content to be sent this .amqpTemplate.convertAndSend( TopicRabbitMQConfig.TOPIC_EXCHANGE, "lzc.message" , "a" ); this .amqpTemplate.convertAndSend(TopicRabbitMQConfig.TOPIC_EXCHANGE, "lzc.lzc" , "b" ); }

} Copy code

Configuration

import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context .annotation.Bean; import org.springframework.context.annotation.Configuration;

/** * The topic switch distributes the routing key attributes of messages through pattern matching, and matches the routing key with a pattern. At this time, the queue needs to be bound to a pattern. * It divides the string of routing key and binding key into words, separated by dots. * It also recognizes two wildcard characters: the symbol "#" and the symbol "*". #Match 0 or more words, *match one word * @author admin * @date Created in 2020/3/4 14:20 * @Time : 14:20 * @description : * @modified By: * @version : $ */ @Configuration public class TopicRabbitMQConfig {

public static final String TOPIC_QUEUE1 = "topic.queue1" ; public static final String TOPIC_QUEUE2 = "topic.queue2" ; public static final String TOPIC_EXCHANGE = "topic.exchange" ;

/** * Topic mode * @return */ @Bean public Queue topicQueue1 () { return new Queue(TOPIC_QUEUE1); } @Bean public Queue topicQueue2 () { return new Queue(TOPIC_QUEUE2); } @Bean public TopicExchange topicExchange () { return new TopicExchange(TOPIC_EXCHANGE); } @Bean public Binding topicBinding1 () { return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with( "lzc.message" ); } @Bean public Binding topicBinding2 () { return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with( "lzc.#" ); }

} Copy code

Recipient

import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;

/** * Recipient * @author tangcv * @date Created in 2020/3/4 14:21 * @Time : 14:21 * @description : * @modified By: * @version : $ */ @Component @Slf4j public class TopicReceiver { //queues refers to the name of the queue to be monitored @RabbitListener(queues = TopicRabbitMQConfig.TOPIC_QUEUE1) public void receiveTopic1 (String a) { log.info( "[TopicReceiver1 listened to the message]" + a); } @RabbitListener(queues = TopicRabbitMQConfig.TOPIC_QUEUE2) public void receiveTopic2 (String a) { log.info( "[TopicReceiver2 listened to the message]" + a); }

} Copy code

Broadcast mode

import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;

/** * Broadcast mode * @author admin * @date Created in 2020/3/4 14:56 * @Time : 14:56 * @description : * @modified By: * @version : $ */ @Component @Slf4j public class FanoutSender {

@Autowired private AmqpTemplate amqpTemplate;

public void sendFanout () { log.info( "[sendFanout has sent a message]" ); //Note that the second parameter here is empty. //Because the fanout exchange does not process routing keys, it simply binds the queue to the exchange. //Every message sent to the exchange will be forwarded to all queues bound to the exchange this .amqpTemplate .convertAndSend(FanoutRabbitMQConfig.FANOUT_EXCHANGE, "" , "a" ); }

} Copy code

Configuration

import org.springframework.amqp.core.*; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.context.annotation.Bean; import org.springframework.context .annotation.Configuration;

/** * @author tangcv * @date Created in 2020/3/4 14:50 * @Time : 14:50 * @description : * @modified By: * @version : $ */

@Configuration public class FanoutRabbitMQConfig {

public static final String FANOUT_QUEUE1 = "fanout.queue1" ; public static final String FANOUT_QUEUE2 = "fanout.queue2" ;

public static final String FANOUT_EXCHANGE = "fanout.exchange" ;

/** * For convenience, no new queue is created here, and the queue created when topic is used directly. * @return */ @Bean public Queue fanoutQueue1 () { return new Queue(FANOUT_QUEUE1); } @Bean public Queue fanoutQueue2 () { return new Queue(FANOUT_QUEUE2); }

/** * Fanout mode * Fanout is the broadcast mode or subscription mode that we are familiar with. It sends a message to the Fanout exchange, and all queues bound to this exchange receive the message. * @return */ @Bean public FanoutExchange fanoutExchange () { return new FanoutExchange(FANOUT_EXCHANGE); } @Bean public Binding fanoutBinding1 () { return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange()); } @Bean public Binding fanoutBinding2 () { return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange()); }

} Copy code

receive

import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;

/** * Broadcast mode * Recipient * @author tangcv * @date Created in 2020/3/4 14:54 * @Time : 14:54 * @description : * @modified By: * @version : $ */

@Component @Slf4j public class FanoutReceiver {

//queues refers to the name of the queue to be monitored @RabbitListener(queues = FanoutRabbitMQConfig.FANOUT_QUEUE1) public void receiveTopic1 (String FanoutReceiver) { log.info( "[receiveTopic1 listened to the message]" + FanoutReceiver); }

@RabbitListener(queues = FanoutRabbitMQConfig.FANOUT_QUEUE2) public void receiveTopic2 (String FanoutReceiver) { log.info( "[receiveTopic2 listened to the message]" + FanoutReceiver); }

} Copy code

Advantages and disadvantages of RabbitMQ

advantage

Asynchronous, decoupling, eliminating peaks and filling valleys

Disadvantage

Mainly lies in the availability, complexity, and consistency of the system. After the introduction of message queues, the availability of MQ needs to be considered. Wouldn't it explode if MQ crashed? Moreover, the complexity has increased significantly. It is necessary to consider some common problems and solutions of message queues, as well as the consistency problem. A message is consumed by multiple consumers. If one consumer fails to consume, it will cause data inconsistency. .

scenes to be used

Service decoupling

Suppose there is a scenario where service A generates data, and services B, C, and D need these data, then we can directly call B, C, and D services in A service, and pass the data to downstream services.

However, as our application scale continues to expand, there will be more services that require A's data. If there are dozens or even hundreds of downstream services, and they will continue to change, in addition to considering the downstream service error situation, Then the maintenance of the calling code in the A service will be extremely difficult

This is due to the tight coupling between services

Let's consider the case of decoupling with RabbitMQ

Service A only needs to send messages to the message server, regardless of who needs the data; if the downstream service needs data, it subscribes to the message from the message server by itself, and cancels the subscription when the data is no longer needed.

Traffic peaking

Suppose we have an application, the usual access volume is 300 requests per second, we can easily deal with it with a server

In the peak period, the traffic volume instantly doubled tenfold, reaching 3000 requests per second, then a single server would definitely not be able to cope. At this time, we can consider increasing to 10 servers to spread the access pressure.

But if this instantaneous peak occurs only once a day, each time is only half an hour, then our 10 servers will only share dozens of requests per second most of the time, which is a bit of a waste of resources.

In this case, we can use RabbitMQ to perform traffic peak reduction. Under peak conditions, a large amount of request data that appears instantaneously is first sent to the message queue server and queued to be processed, and our application can slowly get from the message queue Receiving the requested data for processing, so that the data processing time is lengthened to reduce the instantaneous pressure

This is a very typical application scenario for a message queue server

Asynchronous call

Consider the success of the takeaway payment

After payment, you must send a notification that the payment is successful, and then look for the takeaway boy to deliver. The process of finding the takeaway boy is very time-consuming, especially during peak periods, which may take tens of seconds or even longer.

This causes the entire call link to respond very slowly

And if we introduce the RabbitMQ message queue, the order data can be sent to the message queue server, then the call link can end here, and the order system can get a response immediately, and the response time of the entire link is only about 200 milliseconds.

The app looking for the takeaway boy can receive the order message from the message queue in an asynchronous manner, and then perform the time-consuming search operation

Rabbitmq basic concepts

RabbitMQ is a message middleware used to process asynchronous messages from clients. The server puts the message to be sent into the queue pool. The receiving end can receive the message sent by the server according to the forwarding mechanism configured by RabbitMQ. RabbitMQ performs message forwarding, buffering and persistence operations according to specified forwarding rules. It is mainly used for communication between multiple servers or single-server subsystems. It is a standard configuration of distributed systems.

Exchange

Accept the message sent by the producer, and route the message to the queue in the server according to the Binding rule. ExchangeType determines the behavior of Exchange routing messages. In RabbitMQ, there are three commonly used ExchangeTypes: Direct, Fanout and Topic.

Message Queue

message queue. The messages we send to RabbitMQ will eventually reach various queues and be stored in them (if the route cannot find the corresponding queue, the data will be lost), waiting for consumers to fetch them.

Binding Key (binding key queue)

It means that Exchange and Message Queue are connected through a binding key, and this relationship is fixed.

Routing Key (routing key queue)

When the producer sends a message to Exchange, it usually specifies a routing key to specify the routing rules for this message. This routing key needs to be used in conjunction with Exchange Type and binding key to generate. Our producer only needs to specify the routing key to determine where the message flows.

6.working modes of Rabbitmq

Simple mode

RabbitMQ is a messaging middleware, you can imagine it is a post office. When you put the letter in the mailbox, you can be sure that the postman will deliver your letter correctly. RabbitMq is a mailbox, a post office, and a postman.

  • The program that sends the message is the producer
  • The queue represents a mailbox. Although the message will flow through RbbitMQ and your application, the message can only be stored in the queue. Queue storage space is only limited by server memory and disk, it is essentially a large message buffer. Multiple producers can send messages to the same queue, and multiple consumers can also receive messages from the same queue.
  • The consumer waits to receive a message from the queue

Operating mode

Publish and subscribe model

Routing mode

Theme mode

RPC mode

Several commonly used interview questions in RabbitMQ

1. What is RabbitMQ? Why use RabbitMQ?

Answer: RabbitMQ is an open source, written in Erlang, based on AMQP protocol, messaging middleware;

You can use it to: decoupling, asynchronous, peak clipping.

2. What are the advantages and disadvantages of RabbitMQ?

Answer: Advantages: decoupling, asynchronous, peak clipping;

Disadvantages: Reduce the stability of the system: Originally the system is running well, but now you have to join a message queue to go in, then the message queue is down, and your system is not huh. Therefore, system availability will be reduced;

Increased the complexity of the system: after joining the message queue, many issues must be considered, such as consistency issues, how to ensure that messages are not repeatedly consumed, and how to ensure reliable transmission of messages. Therefore, there are more things to consider and the complexity increases.

3. How to ensure the high availability of RabbitMQ?

Answer: No project will only use one RabbitMQ server to provide services, which is too risky;

4. How to ensure that RabbitMQ is not repeatedly consumed?

Answer: Let me talk about why repeated consumption: Under normal circumstances, when consumers consume messages, after consumption, they will send a confirmation message to the message queue. The message queue will know that the message has been consumed, and the message will be sent from Delete from the message queue;

However, due to network transmission and other failures, the confirmation information is not transmitted to the message queue, which causes the message queue to not know that it has consumed the message, and distributes the message to other consumers again.

In view of the above problems, a solution is: to ensure the uniqueness of the message, even if it is multiple transmissions, do not let the multiple consumption of the message have an impact; ensure that the message is idempotent;

For example: the data written into the message queue is uniquely marked, and when the message is consumed, it is judged whether it has been consumed according to the unique identifier;

5. How to ensure the reliable transmission of RabbitMQ messages?

Answer: The unreliable information may be due to loss of information, hijacking, etc.;

Loss is divided into: producer lost message, message list lost message, consumer lost message;

Producers lose messages: From the perspective of producers losing data, RabbitMQ provides transaction and confirm modes to ensure that producers do not lose messages;

The transaction mechanism means: before sending the message, start the transaction (channel.txSelect()), and then send the message. If there is any exception during the sending process, the transaction will roll back (channel.txRollback()), and if the sending is successful, the transaction will be submitted (Channel.txCommit()). However, this method has a disadvantage: throughput decreases;

The confirm mode is mostly used: once the channel enters the confirm mode, all messages published on the channel will be assigned a unique ID (starting from 1), once the message is delivered to all matching queues;

rabbitMQ will send an ACK to the producer (contains the unique ID of the message), which makes the producer know that the message has arrived at the destination queue correctly;

If rabbitMQ fails to process the message, a Nack message will be sent to you, and you can retry the operation.

Message queue lost data: message persistence.

To deal with the situation where the message queue loses data, it is generally to enable the persistent disk configuration.

This persistence configuration can be used in conjunction with the confirm mechanism. You can send an Ack signal to the producer after the message is persisted to the disk.

In this way, if rabbitMQ dies before the message is persisted to the disk, the producer will not receive the Ack signal, and the producer will automatically resend it.

So how to persist?

By the way, it is actually very easy, just the following two steps

  1. Set the persistent flag of the queue durable to true, which means it is a persistent queue
  2. When sending a message, set deliveryMode=2

After this setting, even if rabbitMQ hangs, the data can be restored after restarting

Consumers lose messages: Consumers usually lose data because they use the automatic message confirmation mode, so you can change to manually confirm the message!

After the consumer receives the message, before processing the message, it will automatically reply that RabbitMQ has received the message;

If processing the message fails at this time, the message will be lost;

Solution: After processing the message successfully, manually reply to the confirmation message.

6. How to ensure the order of RabbitMQ messages?

Answer: Single-threaded consumption guarantees the order of messages; the messages are numbered, and the consumer processes the messages according to the number;


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK