rabbitmq的虚拟主机就相当于是数据库中的库,一个项目应只访问一个虚拟主机。创建虚拟主机的时候应该以’/'开头。
引入的pom文件的依赖
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
</dependencies>
使用的rabbitmq的版本是3.8.19。
1、第一种模式(直连)
生产者
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("主机ip地址");
factory.setPort(5672);
factory.setVirtualHost("虚拟主机名");
factory.setUsername("用户名");
factory.setPassword("密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello",false,false,false,null);
channel.basicPublish("","hello",null,"hello".getBytes());
channel.close();
connection.close();
如果设置持久化为true消费完成后删除也为true,重启rabbitmq后队列并没有被保存下来。 消费者
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("主机ip");
factory.setPort(5672);
factory.setVirtualHost("虚拟主机名");
factory.setUsername("用户名");
factory.setPassword("密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello",false,false,false,null);
channel.basicConsume("hello",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
});
channel.close();
connection.close();
消费者测试的时候应写一个main方法,不能使用@test的方式测试。
第二种模型(work quene)
当消费者速度慢的时候会造成消息队列堆积消息导致消息无法及时处理,使用任务模型让多个消费者绑定到一个队列,共同消费队列中的消息,队列中的消息一旦消费,就会消失,因此任务不会被重复执行。 在两个消费者同时消费一个队列时,rabbitmq默认会按顺序将每一个消息发给下一个消费者,每个消费者都会收到相同数量的消息,这种分发消息的方式叫做循环。如图: 平均分配的时候是把消费者1的消息一次性给消费者1,把消费者2的消息一次性给消费者2,如果一个消费者处理的速度慢的话不影响另一个消费者的速度,另一个消费者只消费自己的消息,如图: 但是当一个消费者在执行第二个消息的时候宕机了,那么分配给这个消费者的后面的消息就会丢失,这显然不是我们想要的。所以采用下面的方式解决这个问题:设置消费者一次只能消费一个消息,把消费者的自动确认关闭,当消息完成后手动确认。
消费者1
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("118.31.55.218");
factory.setPort(5672);
factory.setVirtualHost("/ems");
factory.setUsername("adm");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.basicQos(1);
channel.queueDeclare("hello",true,false,false,null);
channel.basicConsume("hello",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
消费者2
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("118.31.55.218");
factory.setPort(5672);
factory.setVirtualHost("/ems");
factory.setUsername("adm");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.basicQos(1);
channel.queueDeclare("hello",true,false,false,null);
channel.basicConsume("hello",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
第三种模型(fanout)
fanout也称为广播,fanout模型会将消息发送给所有与交换机绑定的队列。 生产者
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("主机ip");
factory.setPort(5672);
factory.setVirtualHost("虚拟主机名");
factory.setUsername("用户名");
factory.setPassword("密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs","fanout");
channel.basicPublish("logs","",null,"fanout".getBytes());
channel.close();
connection.close();
}
消费者
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("主机IP");
factory.setPort(5672);
factory.setVirtualHost("虚拟主机名");
factory.setUsername("用户名");
factory.setPassword("密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("交换机名","fanout");
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,"交换机名","");
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
});
消费者2和消费者1代码差不多。
第四种模型(Routing)
Routing模型可以让不同的消息被不同的队列消费.
Routing之订阅模型-Direct(直连):
1.队列与交换机的绑定需要指定一个Routingkey(路由key) 2.消息的发送方在向交换机发送消息时,也必须指定Routingkey 3.交换机不再把消息发送给每一个绑定的队列,而是根据消息的Routingkey进行判断,只有队列的Routingkey与消息的Routingkey完全一致队列才会收到消息
生产者
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("主机IP");
factory.setPort(5672);
factory.setVirtualHost("虚拟主机名");
factory.setUsername("用户名");
factory.setPassword("密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("direct","direct");
String Routingkey="error";
channel.basicPublish("direct",Routingkey,null,"direct类型".getBytes());
channel.close();
connection.close();
消费者1
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("主机IP");
factory.setPort(5672);
factory.setVirtualHost("虚拟主机名");
factory.setUsername("用户名");
factory.setPassword("密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("direct","direct");
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,"direct","error");
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:"+new String(body));
}
});
消费者2
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("主机IP");
factory.setPort(5672);
factory.setVirtualHost("虚拟主机名");
factory.setUsername("用户名");
factory.setPassword("密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("direct","direct");
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,"direct","error");
channel.queueBind(queue,"direct","info");
channel.queueBind(queue,"direct","warning");
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2:"+new String(body));
}
});
此时消费者1和消费者2都能收到消息,当把生产者的的Routingkey改为info的时候,只有消费者2能收到消息。
Routing之订阅模型-Topic
directl类型的交换机在设置接受队列的路由key时只能设置固定Routingkey,当我们要设置很多Routingkey时会很麻烦,而topic类型的交换机可以解决这个问题,topic类型的交换机可以让队列绑定Routingkey的时候使用通配符!这种类型的Routingkey一般都是由多个单词组成的,多个单词之间使用"."分开。如:item.insert 注意:只用topic类型的交换机才能使用通配符 通配符:*:只能匹配一个单词(不是字母) #:匹配0个或多个单词
生产者
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("主机IP");
factory.setPort(5672);
factory.setVirtualHost("虚拟主机名");
factory.setUsername("用户名");
factory.setPassword("密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topics","topic");
String Routingkey="user";
channel.basicPublish("topics",Routingkey,null,"topic类型".getBytes());
channel.close();
connection.close();
消费者1
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("主机IP");
factory.setPort(5672);
factory.setVirtualHost("虚拟主机名");
factory.setUsername("用户名");
factory.setPassword("密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topics","topic");
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,"topics","user.*");
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:"+new String(body));
}
});
消费者2
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("主机IP");
factory.setPort(5672);
factory.setVirtualHost("虚拟主机名");
factory.setUsername("用户名");
factory.setPassword("密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topics","topic");
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,"topics","user.#");
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2:"+new String(body));
}
});
运行结果:消费者2能收到消息,消费者1不能收到消息。如果把生产者的Routingkey改成user.info那么消费者1和消费者2都能收到消息。
SpringBoot中使用RabbitMQ
pom文件
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
yml文件
spring:
rabbitmq:
host: 主机名
port: 5672
username: 用户名
password: 密码
virtual-host: 虚拟主机
使用RabbitTemplate用来简化操作,只要配置完之后,这个对象就实例化好了,使用时直接在项目中注入。
第一种模型
生产者:
@org.junit.jupiter.api.Test
public void fir(){
rabbitTemplate.convertAndSend("hello","First Hello World");
}
消费者:
@Component
@RabbitListener(queuesToDeclare = @Queue("hello"))
public class TestRabbitmq {
@RabbitHandler
public void receive(String msg){
System.out.println(msg);
}
}
第二种模型
生产者
@org.junit.jupiter.api.Test
public void sec(){
for(int i=0;i<10;i++){
rabbitTemplate.convertAndSend("work","第二种模型"+i);
}
}
消费者
@Component
public class Two {
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive1(String msg){
System.out.println("msg1"+msg);
}
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive2(String msg){
System.out.println("msg2"+msg);
}
}
但是这种消费是轮询的,如图
第三种模型
生产者
@org.junit.jupiter.api.Test
public void fanout(){
rabbitTemplate.convertAndSend("logs","","第三种模型");
}
消费者
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "logs",type = "fanout")
)
})
public void receive1(String msg){
System.out.println("msg1"+msg);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "logs",type = "fanout")
)
})
public void receive2(String msg){
System.out.println("msg2"+msg);
}
第四种模型
生产者
@org.junit.jupiter.api.Test
public void route(){
rabbitTemplate.convertAndSend("direct","info","第四种模型");
}
消费者
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "direct",type = "direct"),
key = {"info","error","warn"}
)
})
public void receive1(String msg){
System.out.println("msg1"+msg);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "direct",type = "direct"),
key = {"info"}
)
})
public void receive2(String msg){
System.out.println("msg2"+msg);
}
第五种模型
生产者
@org.junit.jupiter.api.Test
public void topic(){
rabbitTemplate.convertAndSend("topic","user.save","第五种模型");
}
消费者
@Component
public class Topic {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(type = "topic",name = "topic"),
key = {"user.*","user.save"}
)
})
public void receive1(String msg){
System.out.println("msg1"+msg);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(type = "topic",name = "topic"),
key = {"order.#","produce.#","user.*"}
)
})
public void receive2(String msg){
System.out.println("msg2"+msg);
}
}
|