logo头像

分享技术,品味人生

微服务-RabbitMQ&SpringAMQP

Day04- RabbitMQ&SpringAMQP

本章小结:

  1. 同步通讯时效性强,缺点在于

    1. 耦合度高
    2. 性能吞吐能力弱
    3. 级联失败问题
    4. 资源消耗(等待)问题;
  2. 异步通讯基于事件驱动出发,一般都有个broker,解决同步通讯的四大问题,还做到了==流量削峰==的效果

    1. 主流broker,RabbitMQ,稳定性高、吞吐能力稍弱、基于erlang开发,是==强事务==首选
    2. kafka,超大吞吐量,有少量丢包,适合==大数据通讯允许极微数据丢失==的场景使用
    3. rocketMQ,比肩rabbitmq,基于java和scala开发,适合==大团队有二开需求==的场景;
  3. RabbitMQ使用规范

    1. 消费端做好规划,然后发送端就方便指定rountingKey,消息可以落到具体监听器上;
    2. 交换机和队列都能配置多work,所以一定要确定好用那种交换机,便于扩展;
    3. 多环境并行,建议用fanout,比如生产、预生产同步,集团与子公司同步处理等场景;
    4. 超大环境可以考虑用主题模式,切细了配置,方便后期扩展;
  4. 其他扩展需求

    1. 参考配置

      spring:
        rabbitmq:
          host: 192.168.20.165
          port: 5672
          virtual-host: /
          username: iyyxx
          password: 123321
          listener:
            simple:
              concurrency: 5	# 多线程
              prefetch: 10	# 预取数量
              retry:
                enabled: true   # 允许消息消费失败的重试
                max-attempts: 3   # 消息最多消费次数3次
                initial-interval: 2000    # 消息多次消费的间隔2秒
      
交换机 队列 策略 说明
basic 一发一收,先进先出
work 一发多收,平均分配或能力分配 只是多客户端,程序本身与basic无差别
fanout basic/work 处理同work,只是发送端往交换机发
direct basick/work 同fannout,只是多了routingkey 多了个rk筛子,但是可以分离也==可以交叉!==
topic basick/work 等同direct,只是多了通配符 更符合业务思维,比如 china.# #.news

[TOC]


No.01-RabbitMQ

1、同步通讯与异步通讯的差异对比

image-20211015190436737

image-20211015190445559

image-20211015190452455

image-20211015190500965

image-20211015190505728

image-20211015190509562

image-20211015190514565

image-20211015190519600

2、RabbitMQ 基于Docker的安装

image-20211015190542008

image-20211015190606292

image-20211015190611169

  • 安装实战

    docker pull rabbitmq:3-management
    
    docker run \
      -e RABBITMQ_DEFAULT_USER=iyyxx \
      -e RABBITMQ_DEFAULT_PASS=123321 \
      --name mq \
      --hostname mq1 \
      -p 15672:15672 \
      -p 5672:5672 \
      -d \
      rabbitmq:3-management
    
  • 界面管理(登录、添加用户、设置多租户的虚拟主机, test/123)

image-20211015191923033

image-20211015191932830

image-20211015191937502

3、RabbitMQ&SpringBoot

传统spring操作过程:

  • 创建父工程和pom、引入springboot父工程、日志和test依赖
  • 创建发布、订阅两个model,publisher、consumer
    • publisher创建测试程序,创建连接、开启通道、创建队列、发送消息、结束;
    • consumer创建测试程序(psvm,进程不断),创建连接、开启通道、创建队列(无视是否存在)、接收消息(循环);

小结:操作很繁琐、一点都不优雅

  • 父工程pom

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>org.example</groupId>
        <artifactId>mq-demo</artifactId>
        <packaging>pom</packaging>
        <version>1.0-SNAPSHOT</version>
        <modules>
            <module>consumer</module>
            <module>publisher</module>
        </modules>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.3.9.RELEASE</version>
            <relativePath/>
        </parent>
    
        <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
            </dependency>
    
            <!--AMQP依赖,包含RabbitMQ-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
    
        </dependencies>
    
    </project>
    
  • publisher/pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>mq-demo</artifactId>
            <groupId>org.example</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>publisher</artifactId>
    
        <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
        </properties>
    
    </project>
    
  • publisher/com.iyyxx.mq.hellwork.PublisherTest

    package com.iyyxx.mq.hellwork;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import org.junit.Test;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class PublisherTest {
        @Test
        public void testSendMessage() throws IOException, TimeoutException {
            // 1.建立连接
            ConnectionFactory factory = new ConnectionFactory();
            // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
            factory.setHost("192.168.20.165");
            factory.setPort(5672);
            factory.setVirtualHost("/");
            factory.setUsername("iyyxx");
            factory.setPassword("123321");
            // 1.2.建立连接
            Connection connection = factory.newConnection();
    
            // 2.创建通道Channel
            Channel channel = connection.createChannel();
    
            // 3.创建队列
            String queueName = "simple.queue";
            channel.queueDeclare(queueName, false, false, false, null);
    
            // 4.发送消息
            String message = "hello, rabbitmq!";
            channel.basicPublish("", queueName, null, message.getBytes());
            System.out.println("发送消息成功:【" + message + "】");
    
            // 5.关闭通道和连接
            channel.close();
            connection.close();
    
        }
    }
    
  • consumer/pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>mq-demo</artifactId>
            <groupId>org.example</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>consumer</artifactId>
    
        <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
        </properties>
    
    </project>
    
  • consumer/com.iyyxx.mq.helloworld.ConsumerTest

    package com.iyyxx.mq.helloworld;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class ConsumerTest {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 1.建立连接
            ConnectionFactory factory = new ConnectionFactory();
            // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
            factory.setHost("192.168.20.165");
            factory.setPort(5672);
            factory.setVirtualHost("/");
            factory.setUsername("iyyxx");
            factory.setPassword("123321");
            // 1.2.建立连接
            Connection connection = factory.newConnection();
    
            // 2.创建通道Channel
            Channel channel = connection.createChannel();
    
            // 3.创建队列
            String queueName = "simple.queue";
            channel.queueDeclare(queueName, false, false, false, null);
    
            // 4.订阅消息
            channel.basicConsume(queueName, true, new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // 5.处理消息
                    String message = new String(body);
                    System.out.println("接收到消息:【" + message + "】");
                }
            });
            System.out.println("等待接收消息。。。。");
        }
    }
    

No.02-SpringAMQP的应用

1、SpringAMQP 介绍

image-20211015190746201

image-20211015190623376

image-20211015190629322

image-20211015190633977

image-20211015190757798

2、SpringAMQP Basic Queue简单队列的发送接收

image-20211015190645329

image-20211015190651551

image-20211015190656812

小结:

  • 程序结构:父pom引入amqp依赖,两个model一个发布一个订阅,分别配置applicaiton,一个用test发,一个写监听器
  • 流程结构:一发一接,收到就销毁,监听器是长期驻留的!消息没有回溯
  • 父pom

    <!--AMQP依赖,包含RabbitMQ-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
  • 发布端

    • publisher/application.yml

      spring:
        rabbitmq:
          host: 192.168.20.165
          port: 5672
          virtual-host: /
          username: iyyxx
          password: 123321
          listener:
            simple:
              concurrency: 5
              prefetch: 10
              retry:
                enabled: true   # 允许消息消费失败的重试
                max-attempts: 3   # 消息最多消费次数3次
                initial-interval: 2000    # 消息多次消费的间隔2秒
      
    • publisher/com.iyyxx.mq.spring.SpringAmqpTest

      package com.iyyxx.mq.spring;
      
      
      import org.junit.Test;
      import org.junit.runner.RunWith;
      import org.springframework.amqp.rabbit.core.RabbitTemplate;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.boot.test.context.SpringBootTest;
      import org.springframework.test.context.junit4.SpringRunner;
      
      @RunWith(SpringRunner.class)
      @SpringBootTest
      public class SpringAmqpTest {
      
          @Autowired
          private RabbitTemplate rabbitTemplate;
      
          @Test
          public void testSimpleQueue() {
              String queueName = "simple.queue";
              String message = "hello, spring amqp!";
              rabbitTemplate.convertAndSend(queueName, message);
          }
      }
      
  • 订阅端

    • consumer/application.yml

      spring:
        rabbitmq:
          host: 192.168.20.165
          port: 5672
          virtual-host: /
          username: iyyxx
          password: 123321
      
    • consumer/com.iyyxx.mq.listener.SpringRabbitListener

      package com.iyyxx.mq.listener;
      
      import org.springframework.amqp.rabbit.annotation.RabbitListener;
      import org.springframework.stereotype.Component;
      
      @Component
      public class SpringRabbitListener {
          @RabbitListener(queues = "simple.queue")
          public void listenSimpleQueueMessage(String message){
              System.out.println("spring 消费者受到消息【 "+message+" 】");
          }
      }
      

3、SpringAMQP Work Queue工作队列的发送接收

image-20211018135351091

image-20211015190826361

image-20211015190836103

小结:workQueue与BasicQueue并没有太大不同,只是多了个监听器,但面对不同处理能力的客户端为了高效处理,还是要配置==预取设置==!

  • consumer/application.yml 预取设置为1,让处理能力弱的少拿,实际应该还是要根据压测来

    spring:
      rabbitmq:
        host: 192.168.20.165
        port: 5672
        virtual-host: /
        username: iyyxx
        password: 123321
        listener:
          simple:
            prefetch: 1
    
  • publisher/com.iyyxx.mq.spring.SpringAmqpTest 模拟一秒50条

    @Test
    public void testSendMessage2WorkQueue() throws InterruptedException {
        String queueName = "simple.queue";
        String message = "hello, message__";
        for (int i = 1; i <= 50; i++) {
            rabbitTemplate.convertAndSend(queueName, message+i);
            Thread.sleep(20);
        }
    }
    
  • consumer/com.iyyxx.mq.listener.SpringRabbitListener 模拟两个客户端高低性能处理实验!

    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueueMessage1(String message) throws InterruptedException {
        System.out.println("消费者1……收到消息【 "+message+" 】\t"+ LocalTime.now());
        Thread.sleep(20);
    }
    
    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueueMessage2(String message) throws InterruptedException {
        System.err.println("消费者2……收到消息【 "+message+" 】\t"+ LocalTime.now());
        Thread.sleep(100);
    }
    

4、SpringAMQP 发布订阅模型1-fanoutExchange的发送接收

image-20211015190848955

image-20211015190859884

image-20211015190907428

image-20211015190920371

  • 路由绑定,consumer#com.iyyxx.mq.config.FanoutConfig

    package com.iyyxx.mq.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class FanoutConfig {
    
        @Bean
        public FanoutExchange fanoutExchange(){
            return new FanoutExchange("iyyxx.fanout");
        }
    
        @Bean
        public Queue fanoutQueue1(){
            return new Queue("fanout.queue1");
        }
    
        @Bean
        public Queue fanoutQueue2(){
            return new Queue("fanout.queue2");
        }
    
        @Bean
        public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
            return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
        }
    
        @Bean
        public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
            return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
        }
    }
    
  • 监听器改造,com.iyyxx.mq.listener.SpringRabbitListener

    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1Message1(String message) throws InterruptedException {
        System.out.println("fanout1消费者1……收到消息【 "+message+" 】\t"+ LocalTime.now());
        Thread.sleep(1000);
    }
    
    // 这里做单队列多路并行测试
    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1Message2(String message) throws InterruptedException {
        System.err.println("fanout1消费者2……收到消息【 "+message+" 】\t"+ LocalTime.now());
        Thread.sleep(2000);
    }
    
    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2Message1(String message) throws InterruptedException {
        System.err.println("fanout2消费者1……收到消息【 "+message+" 】\t"+ LocalTime.now());
        Thread.sleep(1000);
    }
    
  • 发送端改造,com.iyyxx.mq.spring.SpringAmqpTest

    @Test
    public void testSendMessage2FanoutExchange() throws InterruptedException {
        String exchangeName = "iyyxx.fanout";
        String message = "hello, message__";
    
        for (int i = 1; i <= 500; i++) {
            rabbitTemplate.convertAndSend(exchangeName, "",message+i);
            Thread.sleep(50);
        }
    }
    
  • ==纯声明式开发==,不用fanoutConfig

    
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "fanout.queue1"),
            exchange = @Exchange(name = "iyyxx.fanout", type = ExchangeTypes.FANOUT)))
    public void listenFanoutQueue1Message1(String message) throws InterruptedException {
        System.out.println("fanout1消费者1……收到消息【 " + message + " 】\t" + LocalTime.now());
    }
    
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "fanout.queue2"),
            exchange = @Exchange(name = "iyyxx.fanout", type = ExchangeTypes.FANOUT)))
    public void listenFanoutQueue1Message2(String message) throws InterruptedException {
        System.err.println("fanout1消费者2……收到消息【 " + message + " 】\t" + LocalTime.now());
    }
    

5、SpringAMQP 发布订阅模型2-directExchange的发送接收

image-20211015190931866

image-20211015190949910

  • 接收器改造

    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),
           exchange = @Exchange(name = "iyyxx.direct", type = ExchangeTypes.DIRECT),
                                             key = {"red", "yellow"}))
    public void listenDirectQueue1Message1(String message) throws InterruptedException {
        System.err.println("direct1消费者1……收到消息【 " + message + " 】\t" + LocalTime.now());
    }
    
    
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),
           exchange = @Exchange(name = "iyyxx.direct", type = ExchangeTypes.DIRECT),
                                             key = {"red", "blue"}))
    public void listenDirectQueue2Message1(String message) throws InterruptedException {
        System.out.println("direct2消费者1……收到消息【 " + message + " 】\t" + LocalTime.now());
    }
    
  • 发送测试改造

    
        @Test
        public void testSendMessage2DirectExchange() throws InterruptedException {
            String exchangeName = "iyyxx.direct";
            String message = "hello, message__";
            rabbitTemplate.convertAndSend(exchangeName, "yellow",message); // 1 收
            rabbitTemplate.convertAndSend(exchangeName, "blue",message); // 2 收
            rabbitTemplate.convertAndSend(exchangeName, "red",message); // 1,2 都收到
        }
    

6、SpringAMQP 发布订阅模型3-topicExchange的发送接收

image-20211015190958497

  • 接收器改造

    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),
           exchange = @Exchange(name = "iyyxx.topic", type = ExchangeTypes.TOPIC),
                                             key = {"china.#"}))
    public void listenTopicQueue1Message1(String message) throws InterruptedException {
        System.err.println("topic1消费者1……收到消息【 " + message + " 】\t" + LocalTime.now());
        Thread.sleep(1000);
    }
    
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),
           exchange = @Exchange(name = "iyyxx.topic", type = ExchangeTypes.TOPIC),
                                             key = {"china.#"}))
    public void listenTopicQueue1Message2(String message) throws InterruptedException {
        System.err.println("topic1消费者2……收到消息【 " + message + " 】\t" + LocalTime.now());
        Thread.sleep(2000);
    }
    
    
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),
           exchange = @Exchange(name = "iyyxx.topic", type = ExchangeTypes.TOPIC),
                                             key = {"#.news"}))
    public void listenTopicQueue2Message1(String message) throws InterruptedException {
        System.out.println("topic2消费者1……收到消息【 " + message + " 】\t" + LocalTime.now());
    }
    
  • 发送测试改造

    @Test
    public void testSendMessage2TopicExchange() throws InterruptedException {
        String exchangeName = "iyyxx.topic";
        String message = "hello, message__";
        for (int i = 1; i <= 50; i++) {
            rabbitTemplate.convertAndSend(exchangeName, "newyork.usa.news", message+i);
        }
    }
    

7、SpringAMQP 消息转换器的实战

image-20211015191009913

image-20211015191022091

  • 错误的示范

    • 配置消息队列(对下面正确流程也有用)

      package com.iyyxx.mq.config;
      
      import org.springframework.amqp.core.Binding;
      import org.springframework.amqp.core.BindingBuilder;
      import org.springframework.amqp.core.FanoutExchange;
      import org.springframework.amqp.core.Queue;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      
      @Configuration
      public class QueueInitConfig {
      
          @Bean
          public Queue objectQueue(){
              return new Queue("object.queue");
          }
      }
      
    • 配置发送端程序

      @Test
      public void testSendObjectMessage2OjbectQueue(){
          Map<String,String> message = new HashMap();
          message.put("name", "石榴姐");
          message.put("age", "16");
      
          rabbitTemplate.convertAndSend("object.queue", message);
      }
      
    • 查看网页端消息

      image-20211019123300617

  • 正确的流程

    • 发送端

      • 依赖修改

        <!-- MessageConvert Json 依赖-->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
        
      • 声明转换类

        package com.iyyxx.mq;
        
        import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
        import org.springframework.amqp.support.converter.MessageConverter;
        import org.springframework.boot.SpringApplication;
        import org.springframework.boot.autoconfigure.SpringBootApplication;
        import org.springframework.context.annotation.Bean;
        
        @SpringBootApplication
        public class PublisherApplication {
            public static void main(String[] args) {
                SpringApplication.run(PublisherApplication.class);
            }
        
            @Bean
            public MessageConverter jsonMessageConverter(){
                return new Jackson2JsonMessageConverter();
            }
        }
        
      • 测试程序

        @Test
        public void testSendObjectMessage2OjbectQueue(){
            Map<String,String> message = new HashMap();
            message.put("name", "石榴姐");
            message.put("age", "16");
        
            rabbitTemplate.convertAndSend("object.queue", message);
        }
        
    • 接收端

      • 依赖修改

        <dependency>
            <groupId>com.fasterxml.jackson.dataformat</groupId>
            <artifactId>jackson-dataformat-xml</artifactId>
            <version>2.11.4</version>
        </dependency>
        
      • 声明转换类

        package com.iyyxx.mq;
        
        import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
        import org.springframework.amqp.support.converter.MessageConverter;
        import org.springframework.boot.SpringApplication;
        import org.springframework.boot.autoconfigure.SpringBootApplication;
        import org.springframework.context.annotation.Bean;
        
        @SpringBootApplication
        public class ConsumerApplication {
            @Bean
            public MessageConverter jsonMessageConverter(){
                return new Jackson2JsonMessageConverter();
            }
            public static void main(String[] args) {
                SpringApplication.run(ConsumerApplication.class, args);
            }
        }
        
      • 配置监听器

        @RabbitListener(queues = "object.queue")
        public void listenObjectQueueMessage(Map<String, String> message){
            System.out.println("object消息【 "+message.get("name")+" 】");
            System.out.println("object消息【 "+message.get("age")+" 】");
        }
        

评论系统未开启,无法评论!