Spring AMQP
RabbitMQ
安装
CentOS 7.9安装
参考文章:https://cloud.tencent.com/developer/article/1447179
快速入门(hello world案例)
publisher
//1. 建立连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("212.64.18.207");
factory.setPort(5672);
factory.setVirtualHost("/yannqing");
factory.setUsername("yannqing");
factory.setPassword("123456");
//建立连接
Connection connection = factory.newConnection();
//创建通道
Channel channel = connection.createChannel();
//创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
//发送消息
String message = "hello rabbitmq";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
//关闭通道和连接
channel.close();
connection.close();
consumer
//1. 建立连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("212.64.18.207");
factory.setPort(5672);
factory.setVirtualHost("/yannqing");
factory.setUsername("yannqing");
factory.setPassword("123456");
//建立连接
Connection connection = factory.newConnection();
//创建通道
Channel channel = connection.createChannel();
//创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
//订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。。。");
Spring AMQP
介绍
官网地址:https://spring.io/projects/spring-amqp
特征
- 用于异步处理入站消息的侦听器容器
RabbitTemplate
用于发送和接收消息RabbitAdmin
用于自动声明队列、交换器和绑定
使用准备
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
配置文件yml
spring:
rabbitmq:
host: 212.64.18.207
port: 5672
virtual-host: /yannqing
username: yannqing
password: 123456
测试(hello world案例)
msg:一个队列绑定一个消费者
publisher
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
String queueName = "simple.queue";
String message = "Hello, spring amqp!";
rabbitTemplate.convertAndSend(queueName, message);
}
}
consumer
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
System.out.println("消费者接收到Simple.queue的消息:【" + msg + "】");
}
}
案例
Work Queue工作队列
msg:模拟WorkQueue,实现一个队列绑定多个消费者
思路:
- 在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue
- 在consumer服务中定义两个消息监听者,都监听simple.queue队列
- 消费者1每秒处理50条消息,消费者2每秒处理10条消息
预期:
- 消费者1应该处理完50条所有消息
publisher
@Test
public void testWorkQueue() throws InterruptedException {
String queueName = "simple.queue";
String message = "Hello, spring amqp!";
for (int i = 1; i <= 50; i++) {
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20);
}
}
consumer
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收到Simple.queue的消息:【" + msg + "】" + LocalTime.now());
//休眠20ms,相当于1s执行50次
Thread.sleep(20);
}
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消费者1接收到Simple.queue的消息:【" + msg + "】" + LocalTime.now());
//休眠200ms,相当于1s执行5次
Thread.sleep(200);
}
处理结果
由处理结果可知:
- 50条消息,被消费者1,2
平均分配了
- 消费者1处理速度快,1s左右就处理结束了。
- 但是消费者2处理速度慢,耗时久
可以通过消费者的yml配置消息预取
来解决这一情况
spring:
rabbitmq:
host: 212.64.18.207
port: 5672
virtual-host: /yannqing
username: yannqing
password: 123456
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成后才能获取下一个消息
修改配置文件后,重启服务
得到结果:
- 消费者1消费了40条消息,消费者2消费了10条消息
发布,订阅(publish,subscribe)模式
常见的三种交换机exchange:
- Fanout:广播
- Direct:路由
- Topic:话题
msg:exchange->转发消息,queue->存储消息
SpringAMQP提供了声明交换机,队列,绑定关系的API,如
Fanout Exchange
msg:广播交换机(所有绑定的queue均可以收到消息)
思路:
- 在consumer服务中,利用代码声明队列,交换机,并将两者绑定
- 在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
- 在publisher中编写测试方法,向simple.fanout发送消息
publisher
@Test
public void testSendFanoutExchange() {
//交换机名称
String exchangeName = "simple.fanout";
//消息
String message = "Hello, fanout exchange, every one!";
//发送消息,参数:交换机名称,RoutingKey(暂时为空),消息
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
consumer
- 通过配置类,将交换机和队列进行绑定
@Configuration
public class FanoutConfig {
// 声明fanoutExchange
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("simple.fanout");
}
//声明第一个队列
@Bean
public Queue fanoutQueue1() {
return new Queue("fanout.queue1");
}
//绑定队列1和交换机
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
//第二个同样的道理
}
- 编写两个消费者,绑定队列
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.err.println("消费者1接收到fanout.queue1的消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.err.println("消费者2接收到fanout.queue2的消息:【" + msg + "】");
}
Direct Exchange
msg:(路由交换机)会将接收到的消息根据规则路由到指定的Queue
- 每一个Queue都与Exchange设置一个BindingKey
- 发布者发送消息时,指定消息的RoutingKey
- Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
- BindingKey与队列绑定,RoutingKey与路由绑定
publisher
@Test
public void testSendDirectExchange() {
//交换机名称
String exchangeName = "simple.direct";
//消息
String message = "Hello, direct exchange, red!";
//发送消息
rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
msg:发送的red是routingKey,是和交换机绑定的(相当于交换机发送一个关于red的消息,看哪个队列有red)
consumer
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "simple.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg) {
System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "simple.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg) {
System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}
msg:consumer绑定的key是BindingKey,是和队列绑定的
Topic Exchange
msg:话题交换机(和Direct Exchange类似,区别在于routingkey必须是多个单次的列表,且要以.分割)
例如:china.news, china.weather, japan.news, japan.weather等
- Queue于Exchange指定BindingKey时可以使用通配符:
- #:代表0个或多个单词(也可以是一个) 例如:china.wuxi.news => china.#
- :代表1个单词 例如:china.news => china. , 也可以使用china.#
publisher
@Test
public void testSendTopicExchange() {
//交换机名称
String exchangeName = "simple.topic";
//消息
String message = "中国,河南今天暴雪-1 ~ 1℃";
//发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.weather", message);
}
consumer
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "simple.topic", type = ExchangeTypes.TOPIC),
key = {"china.#"}
))
public void listenTopicQueue1(String msg) {
System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "simple.topic", type = ExchangeTypes.TOPIC),
key = {"#.news"}
))
public void listenTopicQueue2(String msg) {
System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
}
运行结果:
消息转换器
测试发送Object类型的消息
msg:在Spring AMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送
- spring的对消息对象的处理是由org.springframework.qmqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化
序列化方式缺点:
- 性能差
- 安全性不高
- 数据长度太长
推荐更换序列化方式为JSON,修改只需定义一个MessageConverter类型的Bean即可。
steps:
在publisher引入依赖
<dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> <version>2.9.10</version> </dependency>
在publisher服务声明MessageConverter:
@Bean public MessageConverter jsonMessageConverter(){ return new Jackson2JsonMessageConverter(); }
消费者,监听队列。根据发送者发送的消息类型,监听方法的参数也设置对应的类型即可