目录
-
快速入门案例
- 简单模式
- 工作模式
- 发布与订阅模式
-
举栗
- 生产者Producer代码
- 一些说明
- 消费者Consumer代码
- 一些说明
- windows下创建连接对象失败报错解决
一、快速入门案例
查看官网的 教程RabbitMQ Tutorials :https://www.rabbitmq.com/getstarted.html,共有4种mq的模式场景(fanout、direact、topic、headers)
1. 隐式交换机简单模式:单个消费者

2. 隐式交换机工作模式:多个消费者,分发模式有轮询、公平分发
- 轮询分发:默认情况下,RabbitMQ 将按顺序将每条消息发送给下一个消费者。平均而言,每个消费者都会收到相同数量的信息。这种分发消息的方式称为循环。
- 公平分发:按照权重分发
 - 消息确认:如果消费者在接受完任务之后,突然出故障,这时需要有 消息确认 机制 接着mq将未处理的信息重新放入队列分发处理

3. 显示交换机模式
- 需要一个
exchange交换机 和binding ,交换类型有: direct, topic, headers and fanout direct, topic, headers and fanout 是交换机 和 队列之间的关系binding 是队列和消费者之间的关系,消费者绑定队列 才能收到信息
1.fanout发布与订阅模式
- fanout只是将它接收的所有消息广播到它知道的所有队列
- 消费者binding 指定队列才能接收到信息

2.direct路由交换模式
direct 交换方式下设置相同的binding key 效果和 fanout 类似

3.topic模式

4.举栗
生产者Producer代码
package henu.soft.xiaosi.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection("生产者");
channel = connection.createChannel();
String queueName = "xiaosi";
channel.queueDeclare(queueName,false,false,false,null);
String msg = "RabbitMQ~~";
channel.basicPublish("",queueName,null,msg.getBytes());
System.out.println("消息发送成功!");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
try {
if(channel != null && channel.isOpen()){
channel.close();
}
if(connection != null && channel.isOpen()){
connection.close();
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
}
一些说明




消费者Cosumer代码
package henu.soft.xiaosi.simple;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection("消费者");
channel = connection.createChannel();
String queueName = "xiaosi";
channel.basicConsume(
queueName,
true,
new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("消费者受到消息:" + new String(message.getBody(), "UTF-8"));
}
},
new CancelCallback() {
public void handle(String consumerTag) throws IOException {
System.out.println("消费者接受消息失败!");
}
});
System.out.println("开始接受新消息!");
System.in.read();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
try {
if (channel != null && channel.isOpen()) {
channel.close();
}
if (connection != null && channel.isOpen()) {
connection.close();
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
}
一些说明

 
1.获取连接对象 factory.newConnection("生产者"); 报错

|