微服务-RabbitMQ&SpringAMQP
Day04- RabbitMQ&SpringAMQP
本章小结:
同步通讯时效性强,缺点在于
- 耦合度高
- 性能吞吐能力弱
- 级联失败问题
- 资源消耗(等待)问题;
异步通讯基于事件驱动出发,一般都有个broker,解决同步通讯的四大问题,还做到了==流量削峰==的效果
- 主流broker,RabbitMQ,稳定性高、吞吐能力稍弱、基于erlang开发,是==强事务==首选
- kafka,超大吞吐量,有少量丢包,适合==大数据通讯允许极微数据丢失==的场景使用
- rocketMQ,比肩rabbitmq,基于java和scala开发,适合==大团队有二开需求==的场景;
RabbitMQ使用规范
- 消费端做好规划,然后发送端就方便指定rountingKey,消息可以落到具体监听器上;
- 交换机和队列都能配置多work,所以一定要确定好用那种交换机,便于扩展;
- 多环境并行,建议用fanout,比如生产、预生产同步,集团与子公司同步处理等场景;
- 超大环境可以考虑用主题模式,切细了配置,方便后期扩展;
其他扩展需求
参考配置
spring: rabbitmq: host: 192.168.20.165 port: 5672 virtual-host: / username: iyyxx password: 123321 listener: simple: concurrency: 5 # 多线程 prefetch: 10 # 预取数量 retry: enabled: true # 允许消息消费失败的重试 max-attempts: 3 # 消息最多消费次数3次 initial-interval: 2000 # 消息多次消费的间隔2秒
交换机 | 队列 | 策略 | 说明 |
---|---|---|---|
无 | basic | 一发一收,先进先出 | |
无 | work | 一发多收,平均分配或能力分配 | 只是多客户端,程序本身与basic无差别 |
fanout | basic/work | 处理同work,只是发送端往交换机发 | |
direct | basick/work | 同fannout,只是多了routingkey | 多了个rk筛子,但是可以分离也==可以交叉!== |
topic | basick/work | 等同direct,只是多了通配符 | 更符合业务思维,比如 china.# #.news |
[TOC]
No.01-RabbitMQ
1、同步通讯与异步通讯的差异对比
2、RabbitMQ 基于Docker的安装
安装实战
docker pull rabbitmq:3-management docker run \ -e RABBITMQ_DEFAULT_USER=iyyxx \ -e RABBITMQ_DEFAULT_PASS=123321 \ --name mq \ --hostname mq1 \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3-management
界面管理(登录、添加用户、设置多租户的虚拟主机, test/123)
3、RabbitMQ&SpringBoot
传统spring操作过程:
- 创建父工程和pom、引入springboot父工程、日志和test依赖
- 创建发布、订阅两个model,publisher、consumer
- publisher创建测试程序,创建连接、开启通道、创建队列、发送消息、结束;
- consumer创建测试程序(psvm,进程不断),创建连接、开启通道、创建队列(无视是否存在)、接收消息(循环);
小结:操作很繁琐、一点都不优雅
父工程pom
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>mq-demo</artifactId> <packaging>pom</packaging> <version>1.0-SNAPSHOT</version> <modules> <module>consumer</module> <module>publisher</module> </modules> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.9.RELEASE</version> <relativePath/> </parent> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <!--AMQP依赖,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies> </project>
publisher/pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>mq-demo</artifactId> <groupId>org.example</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>publisher</artifactId> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> </project>
publisher/com.iyyxx.mq.hellwork.PublisherTest
package com.iyyxx.mq.hellwork; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import org.junit.Test; import java.io.IOException; import java.util.concurrent.TimeoutException; public class PublisherTest { @Test public void testSendMessage() throws IOException, TimeoutException { // 1.建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码 factory.setHost("192.168.20.165"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("iyyxx"); factory.setPassword("123321"); // 1.2.建立连接 Connection connection = factory.newConnection(); // 2.创建通道Channel Channel channel = connection.createChannel(); // 3.创建队列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4.发送消息 String message = "hello, rabbitmq!"; channel.basicPublish("", queueName, null, message.getBytes()); System.out.println("发送消息成功:【" + message + "】"); // 5.关闭通道和连接 channel.close(); connection.close(); } }
consumer/pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>mq-demo</artifactId> <groupId>org.example</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>consumer</artifactId> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> </project>
consumer/com.iyyxx.mq.helloworld.ConsumerTest
package com.iyyxx.mq.helloworld; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ConsumerTest { public static void main(String[] args) throws IOException, TimeoutException { // 1.建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码 factory.setHost("192.168.20.165"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("iyyxx"); factory.setPassword("123321"); // 1.2.建立连接 Connection connection = factory.newConnection(); // 2.创建通道Channel Channel channel = connection.createChannel(); // 3.创建队列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4.订阅消息 channel.basicConsume(queueName, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 5.处理消息 String message = new String(body); System.out.println("接收到消息:【" + message + "】"); } }); System.out.println("等待接收消息。。。。"); } }
No.02-SpringAMQP的应用
1、SpringAMQP 介绍
2、SpringAMQP Basic Queue简单队列的发送接收
小结:
- 程序结构:父pom引入amqp依赖,两个model一个发布一个订阅,分别配置applicaiton,一个用test发,一个写监听器
- 流程结构:一发一接,收到就销毁,监听器是长期驻留的!消息没有回溯
父pom
<!--AMQP依赖,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
发布端
publisher/application.yml
spring: rabbitmq: host: 192.168.20.165 port: 5672 virtual-host: / username: iyyxx password: 123321 listener: simple: concurrency: 5 prefetch: 10 retry: enabled: true # 允许消息消费失败的重试 max-attempts: 3 # 消息最多消费次数3次 initial-interval: 2000 # 消息多次消费的间隔2秒
publisher/com.iyyxx.mq.spring.SpringAmqpTest
package com.iyyxx.mq.spring; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSimpleQueue() { String queueName = "simple.queue"; String message = "hello, spring amqp!"; rabbitTemplate.convertAndSend(queueName, message); } }
订阅端
consumer/application.yml
spring: rabbitmq: host: 192.168.20.165 port: 5672 virtual-host: / username: iyyxx password: 123321
consumer/com.iyyxx.mq.listener.SpringRabbitListener
package com.iyyxx.mq.listener; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class SpringRabbitListener { @RabbitListener(queues = "simple.queue") public void listenSimpleQueueMessage(String message){ System.out.println("spring 消费者受到消息【 "+message+" 】"); } }
3、SpringAMQP Work Queue工作队列的发送接收
小结:workQueue与BasicQueue并没有太大不同,只是多了个监听器,但面对不同处理能力的客户端为了高效处理,还是要配置==预取设置==!
consumer/application.yml 预取设置为1,让处理能力弱的少拿,实际应该还是要根据压测来
spring: rabbitmq: host: 192.168.20.165 port: 5672 virtual-host: / username: iyyxx password: 123321 listener: simple: prefetch: 1
publisher/com.iyyxx.mq.spring.SpringAmqpTest 模拟一秒50条
@Test public void testSendMessage2WorkQueue() throws InterruptedException { String queueName = "simple.queue"; String message = "hello, message__"; for (int i = 1; i <= 50; i++) { rabbitTemplate.convertAndSend(queueName, message+i); Thread.sleep(20); } }
consumer/com.iyyxx.mq.listener.SpringRabbitListener 模拟两个客户端高低性能处理实验!
@RabbitListener(queues = "simple.queue") public void listenWorkQueueMessage1(String message) throws InterruptedException { System.out.println("消费者1……收到消息【 "+message+" 】\t"+ LocalTime.now()); Thread.sleep(20); } @RabbitListener(queues = "simple.queue") public void listenWorkQueueMessage2(String message) throws InterruptedException { System.err.println("消费者2……收到消息【 "+message+" 】\t"+ LocalTime.now()); Thread.sleep(100); }
4、SpringAMQP 发布订阅模型1-fanoutExchange的发送接收
路由绑定,consumer#com.iyyxx.mq.config.FanoutConfig
package com.iyyxx.mq.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class FanoutConfig { @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("iyyxx.fanout"); } @Bean public Queue fanoutQueue1(){ return new Queue("fanout.queue1"); } @Bean public Queue fanoutQueue2(){ return new Queue("fanout.queue2"); } @Bean public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } @Bean public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); } }
监听器改造,com.iyyxx.mq.listener.SpringRabbitListener
@RabbitListener(queues = "fanout.queue1") public void listenFanoutQueue1Message1(String message) throws InterruptedException { System.out.println("fanout1消费者1……收到消息【 "+message+" 】\t"+ LocalTime.now()); Thread.sleep(1000); } // 这里做单队列多路并行测试 @RabbitListener(queues = "fanout.queue1") public void listenFanoutQueue1Message2(String message) throws InterruptedException { System.err.println("fanout1消费者2……收到消息【 "+message+" 】\t"+ LocalTime.now()); Thread.sleep(2000); } @RabbitListener(queues = "fanout.queue2") public void listenFanoutQueue2Message1(String message) throws InterruptedException { System.err.println("fanout2消费者1……收到消息【 "+message+" 】\t"+ LocalTime.now()); Thread.sleep(1000); }
发送端改造,com.iyyxx.mq.spring.SpringAmqpTest
@Test public void testSendMessage2FanoutExchange() throws InterruptedException { String exchangeName = "iyyxx.fanout"; String message = "hello, message__"; for (int i = 1; i <= 500; i++) { rabbitTemplate.convertAndSend(exchangeName, "",message+i); Thread.sleep(50); } }
==纯声明式开发==,不用fanoutConfig
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "fanout.queue1"), exchange = @Exchange(name = "iyyxx.fanout", type = ExchangeTypes.FANOUT))) public void listenFanoutQueue1Message1(String message) throws InterruptedException { System.out.println("fanout1消费者1……收到消息【 " + message + " 】\t" + LocalTime.now()); } @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "fanout.queue2"), exchange = @Exchange(name = "iyyxx.fanout", type = ExchangeTypes.FANOUT))) public void listenFanoutQueue1Message2(String message) throws InterruptedException { System.err.println("fanout1消费者2……收到消息【 " + message + " 】\t" + LocalTime.now()); }
5、SpringAMQP 发布订阅模型2-directExchange的发送接收
接收器改造
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "iyyxx.direct", type = ExchangeTypes.DIRECT), key = {"red", "yellow"})) public void listenDirectQueue1Message1(String message) throws InterruptedException { System.err.println("direct1消费者1……收到消息【 " + message + " 】\t" + LocalTime.now()); } @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "iyyxx.direct", type = ExchangeTypes.DIRECT), key = {"red", "blue"})) public void listenDirectQueue2Message1(String message) throws InterruptedException { System.out.println("direct2消费者1……收到消息【 " + message + " 】\t" + LocalTime.now()); }
发送测试改造
@Test public void testSendMessage2DirectExchange() throws InterruptedException { String exchangeName = "iyyxx.direct"; String message = "hello, message__"; rabbitTemplate.convertAndSend(exchangeName, "yellow",message); // 1 收 rabbitTemplate.convertAndSend(exchangeName, "blue",message); // 2 收 rabbitTemplate.convertAndSend(exchangeName, "red",message); // 1,2 都收到 }
6、SpringAMQP 发布订阅模型3-topicExchange的发送接收
接收器改造
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"), exchange = @Exchange(name = "iyyxx.topic", type = ExchangeTypes.TOPIC), key = {"china.#"})) public void listenTopicQueue1Message1(String message) throws InterruptedException { System.err.println("topic1消费者1……收到消息【 " + message + " 】\t" + LocalTime.now()); Thread.sleep(1000); } @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"), exchange = @Exchange(name = "iyyxx.topic", type = ExchangeTypes.TOPIC), key = {"china.#"})) public void listenTopicQueue1Message2(String message) throws InterruptedException { System.err.println("topic1消费者2……收到消息【 " + message + " 】\t" + LocalTime.now()); Thread.sleep(2000); } @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"), exchange = @Exchange(name = "iyyxx.topic", type = ExchangeTypes.TOPIC), key = {"#.news"})) public void listenTopicQueue2Message1(String message) throws InterruptedException { System.out.println("topic2消费者1……收到消息【 " + message + " 】\t" + LocalTime.now()); }
发送测试改造
@Test public void testSendMessage2TopicExchange() throws InterruptedException { String exchangeName = "iyyxx.topic"; String message = "hello, message__"; for (int i = 1; i <= 50; i++) { rabbitTemplate.convertAndSend(exchangeName, "newyork.usa.news", message+i); } }
7、SpringAMQP 消息转换器的实战
错误的示范
配置消息队列(对下面正确流程也有用)
package com.iyyxx.mq.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class QueueInitConfig { @Bean public Queue objectQueue(){ return new Queue("object.queue"); } }
配置发送端程序
@Test public void testSendObjectMessage2OjbectQueue(){ Map<String,String> message = new HashMap(); message.put("name", "石榴姐"); message.put("age", "16"); rabbitTemplate.convertAndSend("object.queue", message); }
查看网页端消息
正确的流程
发送端
依赖修改
<!-- MessageConvert Json 依赖--> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency>
声明转换类
package com.iyyxx.mq; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; @SpringBootApplication public class PublisherApplication { public static void main(String[] args) { SpringApplication.run(PublisherApplication.class); } @Bean public MessageConverter jsonMessageConverter(){ return new Jackson2JsonMessageConverter(); } }
测试程序
@Test public void testSendObjectMessage2OjbectQueue(){ Map<String,String> message = new HashMap(); message.put("name", "石榴姐"); message.put("age", "16"); rabbitTemplate.convertAndSend("object.queue", message); }
接收端
依赖修改
<dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> <version>2.11.4</version> </dependency>
声明转换类
package com.iyyxx.mq; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; @SpringBootApplication public class ConsumerApplication { @Bean public MessageConverter jsonMessageConverter(){ return new Jackson2JsonMessageConverter(); } public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); } }
配置监听器
@RabbitListener(queues = "object.queue") public void listenObjectQueueMessage(Map<String, String> message){ System.out.println("object消息【 "+message.get("name")+" 】"); System.out.println("object消息【 "+message.get("age")+" 】"); }
评论系统未开启,无法评论!