27

RabbitMQ实战:居然有这么多骚操作

 3 years ago
source link: http://mp.weixin.qq.com/s?__biz=MzU5ODUwNzY1Nw%3D%3D&%3Bmid=2247485799&%3Bidx=1&%3Bsn=779803db564993ec4b37d25fea10bb75
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

RabbitMQ的Java客户端统一使用 com.rabbitmq.client 作为顶级包名。其中,最核心的类主要有:ConnectionFactory、Connection、Channel、Consumer、DefaultConsumer、BasicProperties。需要说明的是,本文不只是教你RabbitMQ客户端的基本玩法,还有一些你可能不知道的一些骚操作。

连接RabbitMQ

使用RabbitMQ第一步当然是连接RabbitMQ,这里就不说怎么搭建RabbitMQ环境了,本文假设你已经有RabbitMQ环境,连接RabbitMQ的代码如下:

ConnectionFactory factory = new ConnectionFactory();

factory.setUsername("root");

factory.setPassword("root123");

factory.setVirtualHost("/");

factory.setHost("127.0.0.1");

factory.setPort(5672);

Connection conn = factory.newConnection();

需要说明的是,如果你用的是默认vhost,即/。那么factory.setVirtualHost("/")这行代码可以省掉。那么,这里有一个 有趣 的问题:创建RabbitMQ连接最短的代码是怎样的?答案是只需要两行代码即可。这是为什么呢?因为创建连接的这几个字段都有默认值,用户名密码默认值默认为guest/guest,host和端口默认为localhost和5672(ConnectionFactory.java源码中有DEFAULT_开头命名的常量,就是默认值)。不过需要注意的是默认账户guest只能连接本地RabbitMQ环境:

ConnectionFactory factory = new ConnectionFactory();

Connection conn = factory.newConnection();

创建RabbitMQ连接还有另一种通过URI的方式,代码如下:

ConnectionFactory factory = new ConnectionFactory();

factory.setUri("amqp://username:password@hostName:port/virtualHost");

Connection conn = factory.newConnection();

每成功创建连接,在RabbitMQ服务端都有相应的日志:

2020-05-10 17:51:58.380 [info] <0.7312.0> accepting AMQP connection <0.7312.0> ([::1]:61390 -> [::1]:5672)

2020-05-10 17:51:58.509 [info] <0.7312.0> connection <0.7312.0> ([::1]:61390 -> [::1]:5672): user 'guest' authenticated and granted access to vhost '/'

被动申明

如下这段代码所示,被动申明一个队列。它只检查队列是否存在,如果存在,那么不会有任何操作,并且返回和主动且成功创建队列一样的响应信息。如果队列不存在,那么就会抛出Channel级别的异常。所以,被动申明一般使用在一次性临时性Channel申明的地方:

Queue.DeclareOk response = channel.queueDeclarePassive("queue-name");

// returns the number of messages in Ready state in the queue

response.getMessageCount();

// returns the number of consumers the queue has

response.getConsumerCount();

如果队列不存在时,抛出的异常信息如下:

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'queue-none' in vhost '/', class-id=50, method-id=10)

线程安全

强制要求一个线程一个Channel,不要多个线程共用一个Channel实例,否则会出现一些莫名其妙的错误。

消费者(Push模式)

消费者消费消息一般通过channel.basicConsume方法,这个方法有很多重载参数,不过我们常用的方法是下面这两个。官方更加推荐第一个带有consumerTag的方法,并且每个不同的消费者实例要有不同的consumerTag。强烈不建议一个连接上有相同的consumerTag,否则可能会导致 automatic connection recovery 的问题,参考(https://www.rabbitmq.com/api-guide.html#connection-recovery):

String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback)

String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

consumerTag是消费者唯一标识符。如果是使用了不带consumerTag参数的方法,那么RabbitMQ会自动生成一个唯一Tag,这样的Tag没有业务参考意义。如下图所示: EjMJneB.png!web

如果我们自定义了consumerTag的值,那么,一看某个队列的消费者信息,就知道这些消费者来自哪里、是干嘛的,非常让人容易理解。如下图所示: vyAn6f2.png!web

需要说明的是,这种Push模式,如果生产者产生的消息量超过消费者能承受的量,就会撑爆消费者。不过,RabbitMQ考虑到了这一点,可以通过方法 channel.basicQos (1000)进行限流。basic.qos是针对Channel进行设置的,也就是说只有在channel建立之后才能发送basic.qos命令。在rabbitmq的实现中,每个channel都对应会有一个rabbit_limiter进程,当收到basic.qos命令后,在rabbit_limiter进程中记录信令中prefetch_count的值,同时记录的还有该channel未ack的消息个数,从而保证未ack的消息数量不超过prefetch_count的值(如果prefetch_count设置为0,表示没有任何限制)。

消费者(Pull模式)

Push模式对应的消费端方法是basicConsumer(),而Pull模式对应的消费端方法是basicGet()。每次获取一条消息,不能批量。这种方法效率非常低下,因为不知道队列中是否有消息,所以必须反复询问,即使大部分请求没有结果的情况下,这种方法 非常不推荐使用 。代码如下:

boolean autoAck = false;

GetResponse response = channel.basicGet(queueName, autoAck);

if (response == null) {

// No message retrieved.

} else {

AMQP.BasicProperties props = response.getProps();

byte[] body = response.getBody();

long deliveryTag = response.getEnvelope().getDeliveryTag();

// do something

channel.basicAck(method.deliveryTag, false);

}

高级连接方式

消费者线程默认被一个ExecutorService自动分配,当连接被关闭的时候,默认的ExecutorService会调用shutdown()。但是,如果创建连接的时候使用了用户自定义ExecutorService,必须手动调用shutdown()方法,否则,线程池中的线程可能会阻止JVM终止,除非kill -9。使用自定义线程池代码如下所示:

ExecutorService es = Executors.newFixedThreadPool(20);

Connection conn = factory.newConnection(es);

需要说明的是,这个特性只有当你明确知道在消费者callback碰到处理瓶颈的情况下才考虑使用,如果没有消费者callback,或者非常少量,那么默认的线程池完全足矣。

使用地址集合

在通过factory构造Connection的时候,允许配置多个地址集合,代码如下所示。它会首先尝试连接host1:post1,如果连接失败,会再尝试连接host2:post2,而且整个过程对用户无感知,只要有一个地址是可用的,就不会抛出任何异常:

Address[] addr = new Address[]{ new Address(host1, port1),

new Address(host2, port2)};

Connection conn = factory.newConnection(addr);

支持NIO

RabbitMQ的Java客户端从4.0开始支持Java NIO。NIO的目的不是为了比BIO更快,它只是为了方便用户更轻易的控制资源,比如线程等。默认的BIO模式下,每一个Connection连接都会用一个线程从网络Socket中读取数据。而在NIO模式下,我们是可以控制与网络Socket交互的线程数。

如果你的Java进程中使用了几十甚至上百个Connection,那么可以尝试使用NIO模式,因为它相比默认的BIO模式,可以节省很多的线程资源。并且在线程数设置合理的情况下,性能不会有任何衰减。开启NIO模式非常简单,如下所示:

ConnectionFactory factory = new ConnectionFactory();

// 默认nio模式线程数为1

factory.useNio();


另一种使用NIO的方式:

factory.setNioParams(new NioParams().setNbIoThreads(4));

网络故障自动恢复

在RabbitMQ服务器和Java客户端之间的网络故障是很常见的现象,RabbitMQ的Java客户端是支持自动恢复的,并且4.0以后该特性是默认开启的,证据在ConnectionFactory的源码中:

private boolean automaticRecovery = true;

private boolean topologyRecovery = true;

topology是什么意思?中文是拓扑的意思。在这里是指交换机、队列、绑定关系、消费者等。

我们也可以在new出来ConnectionFactory的时候,显示设置开启or关闭。如果恢复失败,RabbitMQ会固定时间间隔以后进行重试,默认为5秒钟(DEFAULT_NETWORK_RECOVERY_INTERVAL)。可以通过方法setNetworkRecoveryInterval()指定间隔时间。如果构造Connection时用的是地址集合,那么地址会被随机打乱,然后一个接一个进行重试:

ConnectionFactory factory = new ConnectionFactory();

factory.setAutomaticRecoveryEnabled(true);

factory.setNetworkRecoveryInterval(10000);

那么故障恢复在什么时候触发呢?主要是如下这些情况,只要任意一个条件发生都会触发:

  1. Connection上抛出IO异常、或者其他一些其他非预期的异常;

  2. scoket读取超时;

  3. 失去心跳;

如果是应用启动过程中初始化连接碰到RabbitMQ节点故障,这种情况下自动连接恢复是不会介入的。因为这种情况下,很可能RabbitMQ有一些故障或者问题,开发人员有责任排查问题原因。另外,如果显示调用connection.close()方法后,恢复机制也不会介入。

心跳机制

创建ConnectionFactory时,设置一个大于0的值就是开启心跳机制。如果设置等于0的值,就是关闭心跳机制:

ConnectionFactory cf = new ConnectionFactory();

// set the heartbeat timeout to 60 seconds

cf.setRequestedHeartbeat(60);

需要说明的是,如果设置心跳超时值太低的话,可能会由于一些原因比如瞬时网络故障等导致误报。这里给出一些经验数据:值低于5秒的话,很可能造成误报。值低于1秒的话,基本上都是误报。值在5~20秒之间对大部分环境来说,都是一个比较理想的值。如果是一个很大的值,例如1800秒,这时候心跳信息传送的少了,几乎没有实际的影响,就相当于关闭了心跳机制。

END

如果读完觉得有收获的话,欢迎点【好看】,关注【阿飞的博客】,查阅更多精彩历史!!!

UZ7B7n7.jpg!web


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK