Spring AMQP

2024 年 7 月 14 日 星期日(已编辑)
/ ,
12
摘要
Spring AMQP与RabbitMQ的学习
这篇文章上次修改于 2024 年 7 月 22 日 星期一,可能部分内容已经不适用,如有疑问可询问作者。

Spring AMQP

RabbitMQ

安装

CentOS 7.9安装

参考文章:https://cloud.tencent.com/developer/article/1447179

快速入门(hello world案例)

image-20240204143555644-1707028560565-1

image-20240204143555644-1707028560565-1

publisher

//1. 建立连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("212.64.18.207");
factory.setPort(5672);
factory.setVirtualHost("/yannqing");
factory.setUsername("yannqing");
factory.setPassword("123456");

//建立连接
Connection connection = factory.newConnection();
//创建通道
Channel channel = connection.createChannel();
//创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
//发送消息
String message = "hello rabbitmq";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
//关闭通道和连接
channel.close();
connection.close();

consumer

//1. 建立连接 
 ConnectionFactory factory = new ConnectionFactory();
 factory.setHost("212.64.18.207");
 factory.setPort(5672);
 factory.setVirtualHost("/yannqing");
 factory.setUsername("yannqing");
 factory.setPassword("123456");

 //建立连接
 Connection connection = factory.newConnection();
 //创建通道
 Channel channel = connection.createChannel();
 //创建队列
 String queueName = "simple.queue";
 channel.queueDeclare(queueName, false, false, false, null);

 //订阅消息
 channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
     @Override
     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
         String message = new String(body);
         System.out.println("接收到消息:【" + message + "】");
     }
 });
 System.out.println("等待接收消息。。。。。。");

Spring AMQP

介绍

yom4ewirruome2nvm4

yom4ewirruome2nvm4

官网地址:https://spring.io/projects/spring-amqp

特征

  • 用于异步处理入站消息的侦听器容器
  • RabbitTemplate用于发送和接收消息
  • RabbitAdmin用于自动声明队列、交换器和绑定

使用准备

引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

配置文件yml

spring:
  rabbitmq:
    host: 212.64.18.207
    port: 5672
    virtual-host: /yannqing
    username: yannqing
    password: 123456

测试(hello world案例)

msg:一个队列绑定一个消费者

publisher

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Test
    public void testSimpleQueue() {
        String queueName = "simple.queue";
        String message = "Hello, spring amqp!";
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

consumer

@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg) {
        System.out.println("消费者接收到Simple.queue的消息:【" + msg + "】");
    }
}

案例

Work Queue工作队列

image-20240204165823084

image-20240204165823084

msg:模拟WorkQueue,实现一个队列绑定多个消费者

思路:

  1. 在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue
  2. 在consumer服务中定义两个消息监听者,都监听simple.queue队列
  3. 消费者1每秒处理50条消息,消费者2每秒处理10条消息

预期:

  • 消费者1应该处理完50条所有消息

publisher

@Test
public void testWorkQueue() throws InterruptedException {
    String queueName = "simple.queue";
    String message = "Hello, spring amqp!";
    for (int i = 1; i <= 50; i++) {
        rabbitTemplate.convertAndSend(queueName, message + i);
        Thread.sleep(20);
    }
}

consumer

@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
    System.out.println("消费者1接收到Simple.queue的消息:【" + msg + "】" + LocalTime.now());
    //休眠20ms,相当于1s执行50次
    Thread.sleep(20);
}

@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
    System.err.println("消费者1接收到Simple.queue的消息:【" + msg + "】" + LocalTime.now());
    //休眠200ms,相当于1s执行5次
    Thread.sleep(200);
}

处理结果

image-20240204171017820

image-20240204171017820

由处理结果可知:

  • 50条消息,被消费者1,2平均分配了
  • 消费者1处理速度快,1s左右就处理结束了。
  • 但是消费者2处理速度慢,耗时久

可以通过消费者的yml配置消息预取来解决这一情况

spring:
  rabbitmq:
    host: 212.64.18.207
    port: 5672
    virtual-host: /yannqing
    username: yannqing
    password: 123456
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成后才能获取下一个消息

修改配置文件后,重启服务

得到结果:

  • 消费者1消费了40条消息,消费者2消费了10条消息

发布,订阅(publish,subscribe)模式

image-20240204172253596

image-20240204172253596

常见的三种交换机exchange:

  • Fanout:广播
  • Direct:路由
  • Topic:话题

msg:exchange->转发消息,queue->存储消息

SpringAMQP提供了声明交换机,队列,绑定关系的API,如

image-20240204172803233

image-20240204172803233

Fanout Exchange

msg:广播交换机(所有绑定的queue均可以收到消息)

思路:

  • 在consumer服务中,利用代码声明队列,交换机,并将两者绑定
  • 在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
  • 在publisher中编写测试方法,向simple.fanout发送消息

publisher

@Test
public void testSendFanoutExchange() {
    //交换机名称
    String exchangeName = "simple.fanout";
    //消息
    String message = "Hello, fanout exchange, every one!";
    //发送消息,参数:交换机名称,RoutingKey(暂时为空),消息
    rabbitTemplate.convertAndSend(exchangeName, "", message);
}

consumer

  1. 通过配置类,将交换机和队列进行绑定
@Configuration
public class FanoutConfig {
    // 声明fanoutExchange
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("simple.fanout");
    }
    //声明第一个队列
    @Bean
    public Queue fanoutQueue1() {
        return new Queue("fanout.queue1");
    }
    //绑定队列1和交换机
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
    //第二个同样的道理
}
  1. 编写两个消费者,绑定队列
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
    System.err.println("消费者1接收到fanout.queue1的消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
    System.err.println("消费者2接收到fanout.queue2的消息:【" + msg + "】");
}

Direct Exchange

msg:(路由交换机)会将接收到的消息根据规则路由到指定的Queue

  • 每一个Queue都与Exchange设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
    • BindingKey与队列绑定,RoutingKey与路由绑定
image-20240204224859603

image-20240204224859603

publisher

@Test
public void testSendDirectExchange() {
    //交换机名称
    String exchangeName = "simple.direct";
    //消息
    String message = "Hello, direct exchange, red!";
    //发送消息
    rabbitTemplate.convertAndSend(exchangeName, "red", message);
}

msg:发送的red是routingKey,是和交换机绑定的(相当于交换机发送一个关于red的消息,看哪个队列有red)

consumer

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue1"),
        exchange = @Exchange(name = "simple.direct", type = ExchangeTypes.DIRECT),
        key = {"red", "blue"}
))
public void listenDirectQueue1(String msg) {
    System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue2"),
        exchange = @Exchange(name = "simple.direct", type = ExchangeTypes.DIRECT),
        key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg) {
    System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}

msg:consumer绑定的key是BindingKey,是和队列绑定的

Topic Exchange

msg:话题交换机(和Direct Exchange类似,区别在于routingkey必须是多个单次的列表,且要以.分割)

例如:china.news, china.weather, japan.news, japan.weather等

  • Queue于Exchange指定BindingKey时可以使用通配符:
    • #:代表0个或多个单词(也可以是一个) 例如:china.wuxi.news => china.#
    • :代表1个单词 例如:china.news => china. , 也可以使用china.#

publisher

@Test
public void testSendTopicExchange() {
    //交换机名称
    String exchangeName = "simple.topic";
    //消息
    String message = "中国,河南今天暴雪-1 ~ 1℃";
    //发送消息
    rabbitTemplate.convertAndSend(exchangeName, "china.weather", message);
}

consumer

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "topic.queue1"),
        exchange = @Exchange(name = "simple.topic", type = ExchangeTypes.TOPIC),
        key = {"china.#"}
))
public void listenTopicQueue1(String msg) {
    System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "topic.queue2"),
        exchange = @Exchange(name = "simple.topic", type = ExchangeTypes.TOPIC),
        key = {"#.news"}
))
public void listenTopicQueue2(String msg) {
    System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
}

运行结果:

image-20240205004239095

image-20240205004239095

消息转换器

测试发送Object类型的消息

msg:在Spring AMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送

  • spring的对消息对象的处理是由org.springframework.qmqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化

序列化方式缺点:

  • 性能差
  • 安全性不高
  • 数据长度太长

推荐更换序列化方式为JSON,修改只需定义一个MessageConverter类型的Bean即可。

steps:

  • 在publisher引入依赖

    <dependency>
        <groupId>com.fasterxml.jackson.dataformat</groupId>
        <artifactId>jackson-dataformat-xml</artifactId>
        <version>2.9.10</version>
    </dependency>
  • 在publisher服务声明MessageConverter:

    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
  • 消费者,监听队列。根据发送者发送的消息类型,监听方法的参数也设置对应的类型即可

  • Loading...
  • Loading...
  • Loading...
  • Loading...
  • Loading...