一、使用场景
官方文档:https://www.rabbitmq.com/networking.htmll 
1.1 异步处理

1.2 应用解耦

1.3 流量控制

二、RabbitMQ概念
RabbitMQ简介: RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现。 核心概念 Message 消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成, 这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可 能需要持久性存储)等。 Publisher 消息的生产者,也是一个向交换器发布消息的客户端应用程序。 Exchange 交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。 Exchange有4种类型:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别Queue 消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直 在队列里面,等待消费者连接到这个队列将其取走。 Binding 绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交 换器理解成一个由绑定构成的路由表。 Exchange 和Queue的绑定可以是多对多的关系。 Connection 网络连接,比如一个TCP连接。 Channel 信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道 发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都 是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。Consumer 消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。 Virtual Host 虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加 密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥 有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时 指定,RabbitMQ 默认的 vhost 是 / 。

消息队列主要有两种形式的目的地
- 队列(queue):点对点消息通信(point-to-point)
消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获 取消息内容,消息读取后被移出队列 消息只有唯一的发送者和接受者,但并不是说只能有一个接收者 - 主题(topic):发布(publish)/订阅(subscribe)消息通信
发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个 主题,那么就会在消息到达时同时收到消息
三、RabbitMAQ的安装
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p
25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
4369, 25672 (Erlang发现&集群端口)
5672, 5671 (AMQP端口)
15672 (web管理后台端口)
61613, 61614 (STOMP协议端口)
1883, 8883 (MQTT协议端口)
官方文档:https://www.rabbitmq.com/networking.html
 web管理页面:http://192.168.56.10:15672/#/ 默认的用户名:guest 默认的密 码:guest
     
四:Exchange 交换机类型类型
Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、 fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键, headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接 看另外三种类型: direct : 定向交换机也叫点对点交换机 topic : 主题交换机 fanout:广播交换机
direct 定向接收  fanout 这种交换机所有的对列都会接收  ★ topic 队列通过条件接收 
4.1 新建一个交换机

4.2 新建一个队列

4.3 交换机绑定队列
 
五、测试
 分析:创建出3个交换机,一个为direct类型,一个为fanout类型,一个为topic类型 发消息给交换机,查看队列中的数据。
5.1 发消息给指定的队列


5.2 接收队列

 对于fanout的交换机:填写或者不填写Routing key 所有绑定的队列都会接收。  *对于topic 的交换机,可以设置不同的规则匹配,符合Routing key条件的队列会接收,#匹配0个或多个单词,匹配一 个单词 *特别注意:#匹配0个或多个单词,匹配一个单词
 
六、springboot整合RabbitMQ
1. 引入 spring-boot-starter-amqp
2. application.yml配置
3. 测试RabbitMQ
1. AmqpAdmin:管理组件
2. RabbitTemplate:消息发送处理组件
3. @RabbitListener 监听消息的方法可以有三种参数(不分数量,顺序)
? Object content, Message message, Channel
第一步:引入依赖
<!-- 引入RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
第二步:添加配置 1.配置文件配置
spring.rabbitmq.host=192.168.56.10
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
spring.rabbitmq.listener.simple.acknowledge-mode=manual
2.在GulimallOrderApplication上开启注解 @EnableRabbit
3.配置 把RabbitMQ接收和发送的数据转为json
@Configuration
public class MyRabbitConfig {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
第三步:测试发送消息 组件: 1. AmqpAdmin:管理组件 2. RabbitTemplate:消息发送处理组件 3. @RabbitListener 监听消息的方法可以有三种参数(不分数量,顺序) ? Object content, Message message, Channel
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class GulimallOrderApplicationTests {
// 可以创建交换机,队列,并管理
@Autowired
private AmqpAdmin amqpAdmin;
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 第一步:创建交换机
* 1、如何创建Exchange、Queue、Binding
* 1)、使用AmqpAdmin进行创建
* 2、如何收发消息
*/
@Test
public void createExchange() {
/**
* name 名称
* durable 是否持久化
* autoDelete 是否可自动删除,没有绑定任何队列时删除
*/
Exchange directExchange = new DirectExchange("hello-java-exchange",true,false);
amqpAdmin.declareExchange(directExchange);
log.info("Exchange[{}]创建成功:","hello-java-exchange");
}
/**
* 第二步:创建队列
*/
@Test
public void testCreateQueue() {
/**
* name 名称
* durable 是否持久化
* exclusive 是否排它, 如果有一个连接了其他就无法连接它,这个最好设置为false
* autoDelete 是否可自动删除,没有绑定任何队列时删除
*/
Queue queue = new Queue("hello-java-queue",true,false,false);
amqpAdmin.declareQueue(queue);
log.info("Queue[{}]创建成功:","hello-java-queue");
}
/**
*第三步:交换机与队列进行绑定
*/
@Test
public void createBinding() {
/**
* String destination, 目的地,队列名称
* Binding.DestinationType destinationType, 类型,可以绑定队列或者交换机
* String exchange, 交换机名称
* String routingKey, 路由键
* Map<String, Object> arguments 参数
*/
Binding binding = new Binding("hello-java-queue",
Binding.DestinationType.QUEUE,
"hello-java-exchange",
"hello.java",
null);
amqpAdmin.declareBinding(binding);
log.info("Binding[{}]创建成功:","hello-java-binding");
}
/**
* 第四步:交换机发送消息
*/
@Test
public void sendMessageTest() {
OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
reasonEntity.setId(1L);
reasonEntity.setCreateTime(new Date());
reasonEntity.setName("reason");
reasonEntity.setStatus(1);
reasonEntity.setSort(2);
String msg = "Hello World";
//1、发送消息,如果发送的消息是个对象,会使用序列化机制,将对象写出去,对象必须实现Serializable接口
//2、发送的对象类型的消息,可以是一个json
/**
* String exchange, 交换机
* String routingKey, 路由键,要与第四步绑定时设置的路由键有关系才能接收
* Object object, 发送的对象,如果时对象那么这个对象必须是序列化的
* @Nullable CorrelationData correlationData
*/
rabbitTemplate.convertAndSend("hello-java-exchange","hello2.java",
reasonEntity,new CorrelationData(UUID.randomUUID().toString()));
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",
reasonEntity,new CorrelationData(UUID.randomUUID().toString()));
log.info("消息发送完成:{}",reasonEntity);
}
// @Test
// public void create() {
// HashMap<String, Object> arguments = new HashMap<>();
// arguments.put("x-dead-letter-exchange", "order-event-exchange");
// arguments.put("x-dead-letter-routing-key", "order.release.order");
// arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟
// Queue queue = new Queue("order.delay.queue", true, false, false, arguments);
// amqpAdmin.declareQueue(queue);
// log.info("Queue[{}]创建成功:","order.delay.queue");
// }
}

 第四步:测试接收消息
@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {
@Override
public PageUtils queryPage(Map<String, Object> params) {
IPage<OrderItemEntity> page = this.page(
new Query<OrderItemEntity>().getPage(params),
new QueryWrapper<OrderItemEntity>()
);
return new PageUtils(page);
}
/**
* queues:声明需要监听的队列
* message : 如果我们不知道发送的对象类型,可以统一用org.springframework.amqp.core.Message 来接收
* OrderReturnReasonEntity:接收消息的对象,因为我们发的时这个对象,所以接收也可以用这个对象
* Message message,OrderReturnReasonEntity 这两个参数写一个就可以
* channel:当前传输数据的通道
*/
@RabbitListener(queues = {"hello-java-queue"})
public void revieveMessage(Message message,
OrderReturnReasonEntity content) {
//拿到主体内容
byte[] body = message.getBody();
//拿到的消息头属性信息
MessageProperties messageProperties = message.getMessageProperties();
System.out.println("接受到的消息...内容" + message + "===内容:" + content.getClass());
}
}

七、最佳实战@RabbitListener + @RabbitHandler
@RabbitListener 可以在类或者方法上 @RabbitHandler 只能加在方法上 第一步:发送消息
@Slf4j
@RestController
@RequestMapping("/rabbit")
public class RabbitController {
@Autowired
RabbitTemplate rabbitTemplate;
@RequestMapping("/send")
public R sendMessage(){
for (int i=0;i<10;i++){
if(i%2==0){
OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
reasonEntity.setId((long) i);
reasonEntity.setCreateTime(new Date());
reasonEntity.setName("reason");
reasonEntity.setStatus(1);
reasonEntity.setSort(2);
//1、发送消息,如果发送的消息是个对象,会使用序列化机制,将对象写出去,对象必须实现Serializable接口
//2、发送的对象类型的消息,可以是一个json
/**
* String exchange, 交换机
* String routingKey, 路由键,要与第四步绑定时设置的路由键有关系才能接收
* Object object, 发送的对象,如果时对象那么这个对象必须是序列化的
* @Nullable CorrelationData correlationData
*/
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",
reasonEntity,new CorrelationData(UUID.randomUUID().toString()));
// log.info("消息发送完成OrderReturnReasonEntity:{}",i);
}else{
OrderEntity orderEntity = new OrderEntity();
orderEntity.setId((long) i);
// orderEntity.setCreateTime(new Date());
orderEntity.setOrderSn(String.valueOf(UUID.randomUUID()));
//1、发送消息,如果发送的消息是个对象,会使用序列化机制,将对象写出去,对象必须实现Serializable接口
//2、发送的对象类型的消息,可以是一个json
/**
* String exchange, 交换机
* String routingKey, 路由键,要与第四步绑定时设置的路由键有关系才能接收
* Object object, 发送的对象,如果时对象那么这个对象必须是序列化的
* @Nullable CorrelationData correlationData
*/
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",
orderEntity,new CorrelationData(UUID.randomUUID().toString()));
// log.info("消息发送完成OrderEntity:{}",i);
}
System.out.println("maruis---消息发送完成--->" + i);
}
return R.ok();
}
}
第二步:接收消息
@Service("orderItemService")
@RabbitListener(queues = {"hello-java-queue"})
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {
@RabbitHandler
public void revieveMessage( OrderReturnReasonEntity content) {
System.out.println("接受到的消息OrderReturnReasonEntity...内容" + content.getId() + "===类型:" + content.getClass().getName());
}
@RabbitHandler
public void revieveMessage( OrderEntity content) {
System.out.println("接受到的消息OrderEntity...内容" + content.getId() + "===类型:" + content.getClass().getName());
}
@Override
public PageUtils queryPage(Map<String, Object> params) {
IPage<OrderItemEntity> page = this.page(
new Query<OrderItemEntity>().getPage(params),
new QueryWrapper<OrderItemEntity>()
);
return new PageUtils(page);
}
/**
* queues:声明需要监听的队列
* message : 如果我们不知道发送的对象类型,可以统一用org.springframework.amqp.core.Message 来接收
* OrderReturnReasonEntity:接收消息的对象,因为我们发的时这个对象,所以接收也可以用这个对象
* Message message,OrderReturnReasonEntity 这两个参数写一个就可以
* channel:当前传输数据的通道
* 场景:
* 1)、订单服务启动多个,同一个信息,只能有一个客户端接收。
* 2)、只有一个消息完全处理完,方法运行结束,我们才可以接收其他的消息。
*/
// @RabbitListener(queues = {"hello-java-queue"})
// public void revieveMessage(Message message,
// OrderReturnReasonEntity content,
// Channel channel) {
// //拿到主体内容
// byte[] body = message.getBody();
// //拿到的消息头属性信息
// MessageProperties messageProperties = message.getMessageProperties();
// System.out.println("接受到的消息...内容" + message + "===内容:" + content.getClass());
//
// }
}

八、RabbitMQ消息确认机制-可靠抵达
 ? publisher confirmCallback 确认模式,发送信息mq服务器,并且服务器接收以后回调。 ? publisher returnCallback 未投递到 queue 退回模式,queue接收到信息后回调 ? consumer ack机制 消费者接收到信息后回调
前两个时发送端确认,最后一个时消费端确认
8.1 发送端的消息确认confirmCallback+returnCallback
第一步:yml配置
spring.rabbitmq.host=192.168.56.10
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
spring.rabbitmq.listener.simple.acknowledge-mode=manual
第二步:设置配置类
@Configuration
public class MyRabbitConfig {
private RabbitTemplate rabbitTemplate;
@Primary
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setMessageConverter(messageConverter());
initRabbitTemplate();
return rabbitTemplate;
}
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
/**
* 定制RabbitTemplate
* 1、服务收到消息就会回调
* 1、spring.rabbitmq.publisher-confirms: true
* 2、设置确认回调
* 2、消息正确抵达队列就会进行回调
* 1、spring.rabbitmq.publisher-returns: true
* spring.rabbitmq.template.mandatory: true
* 2、设置确认回调ReturnCallback
*
* 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
*
*/
// @PostConstruct //MyRabbitConfig对象创建完成以后,执行这个方法
public void initRabbitTemplate() {
/**
* 1、只要消息抵达Broker就ack=true
* correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
* ack:消息是否成功收到
* cause:失败的原因
*/
//设置确认回调
rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
});
/**
* 注意:这个方法时消息没有抵达才会调用
* 只要消息没有投递给指定的队列,就触发这个失败回调
* message:投递失败的消息详细信息
* replyCode:回复的状态码
* replyText:回复的文本内容
* exchange:当时这个消息发给哪个交换机
* routingKey:当时这个消息用哪个路邮键
*/
rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
"==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
});
}
}

8.2 消费端消息确认
消费端默认的确认方式是自动确认的,但是这种会出现一个问题就是在还未确认之前发生宕机时,数据就会从RabbitMQ中丢失,为了让数据准确抵达,我们需要手动确认,手动确认的结果有两种:一种时接收;一种是拒绝(拒绝后数据可以丢失,也可以重新让如队列)
第一步: 配置手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
第二步:发送小心
@Slf4j
@RestController
@RequestMapping("/rabbit")
public class RabbitController {
@Autowired
RabbitTemplate rabbitTemplate;
@RequestMapping("/send")
public R sendMessage(){
for (int i=0;i<10;i++){
if(i%2==0){
OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
reasonEntity.setId((long) i);
reasonEntity.setCreateTime(new Date());
reasonEntity.setName("reason");
reasonEntity.setStatus(1);
reasonEntity.setSort(2);
//1、发送消息,如果发送的消息是个对象,会使用序列化机制,将对象写出去,对象必须实现Serializable接口
//2、发送的对象类型的消息,可以是一个json
/**
* String exchange, 交换机
* String routingKey, 路由键,要与第四步绑定时设置的路由键有关系才能接收
* Object object, 发送的对象,如果时对象那么这个对象必须是序列化的
* @Nullable CorrelationData correlationData 消息的唯一ID,为了实现消息的可靠投递
*/
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",
reasonEntity,new CorrelationData(UUID.randomUUID().toString()));
// log.info("消息发送完成OrderReturnReasonEntity:{}",i);
}else{
OrderEntity orderEntity = new OrderEntity();
orderEntity.setId((long) i);
// orderEntity.setCreateTime(new Date());
orderEntity.setOrderSn(String.valueOf(UUID.randomUUID()));
//1、发送消息,如果发送的消息是个对象,会使用序列化机制,将对象写出去,对象必须实现Serializable接口
//2、发送的对象类型的消息,可以是一个json
/**
* String exchange, 交换机
* String routingKey, 路由键,要与第四步绑定时设置的路由键有关系才能接收
* Object object, 发送的对象,如果时对象那么这个对象必须是序列化的
* @Nullable CorrelationData correlationData 消息的唯一ID,为了实现消息的可靠投递
*/
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",
orderEntity,new CorrelationData(UUID.randomUUID().toString()));
// log.info("消息发送完成OrderEntity:{}",i);
}
System.out.println("maruis---消息发送完成--->" + i);
}
return R.ok();
}
}
第三步:消息确认
/**
* queues:声明需要监听的队列
* message : 如果我们不知道发送的对象类型,可以统一用org.springframework.amqp.core.Message 来接收
* OrderReturnReasonEntity:接收消息的对象,因为我们发的时这个对象,所以接收也可以用这个对象
* Message message,OrderReturnReasonEntity 这两个参数写一个就可以
* channel:com.rabbitmq.clientChannel 当前传输数据的通道
* 场景:
* 1)、订单服务启动多个,同一个信息,只能有一个客户端接收。
* 2)、只有一个消息完全处理完,方法运行结束,我们才可以接收其他的消息。
*/
@RabbitListener(queues = {"hello-java-queue"})
// @RabbitHandler
public void revieveMessage(Message message,
Channel channel) {
//拿到主体内容
byte[] body = message.getBody();
//拿到的消息头属性信息
MessageProperties messageProperties = message.getMessageProperties();
System.out.println("接受到的消息...内容" + message + "===类型:" + message.getClass());
// 在chanel内部是顺序自增的
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.println("maruis----deliveryTag-->" + deliveryTag);
/**
*deliveryTag 消息id
* false 不批量
*/
// 签收消息,非批量模式
try {
if(deliveryTag%2==0){
channel.basicAck(deliveryTag,false);
System.out.println("maruis------>" + "签收成功");
}else{
// 参数: 消息id,是否重新投递给服务器
// channel.basicReject(deliveryTag,false);
// 参数: 消息id,是否批量拒绝(如果为true,那么这条信息之前的信息全部被拒),是否重新投递给服务器(如果是true,就会重新放入队列,false丢弃)
channel.basicNack(deliveryTag,false,true);
System.out.println("maruis------>" + "退货");
}
}catch (Exception e){
// 网络终端
}
}

|