RabbitMQ详解
RabbitMq发送接收消息方式
RabbitMq不使用交换机
发送方
public static void main(String[] args) {
String messageData = "test message, hello!";
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("47.110.157.82");
factory.setPort(5672);
factory.setUsername("liu");
factory.setPassword("123456");
Connection connection=null;
Channel channel=null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
/**
* 参数1:队列名
* 参数2:持久化
* 参数3:是否排外,排外只允许一个消费者监听
* 参数4:是否自动删除,队列中没有消息并且没有消费者连接时会自动删除
* 参数4:为队列设置属性,通常为null
*/
channel.queueDeclare("myqueue",true,false,false,null);
/**
* 参数1:交换机名
* 参数2:队列名或者routingkey,指定了交换机这里就是routingkey
* 参数3:设置消息属性,通常为null
* 参数4:消息数据
*/
channel.basicPublish("","myqueue",null,messageData.getBytes());
System.out.println("消息发送成功");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
if(channel!=null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if(connection!=null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
接收方
public static void main(String[] args) {
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("47.110.157.82");
factory.setPort(5672);
factory.setUsername("liu");
factory.setPassword("123456");
Connection connection=null;
Channel channel=null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare("myqueue",true,false,false,null);
/**
* 参数1 监听队列名
* 参数2 自动确认
* 参数3 接受者标签
* 参数4 消息接收回调方法
*/
channel.basicConsume("myqueue",true,"",new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message=new String(body,"utf-8");
System.out.println(message);
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
}
}
RabbitMq-direct
发送方
public static void main(String[] args) {
String messageData = "direct message, hello!";
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("47.110.157.82");
factory.setPort(5672);
factory.setUsername("liu");
factory.setPassword("123456");
Connection connection=null;
Channel channel=null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare("myqueue001",true,false,false,null);
channel.exchangeDeclare("exchange001","direct",true);
channel.queueBind("myqueue001","exchange001","routingkey001");
channel.basicPublish("exchange001","routingkey001",null,messageData.getBytes());
System.out.println("消息发送成功");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
if(channel!=null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if(connection!=null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
接收方
public static void main(String[] args) {
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("47.110.157.82");
factory.setPort(5672);
factory.setUsername("liu");
factory.setPassword("123456");
Connection connection=null;
Channel channel=null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare("myqueue001",true,false,false,null);
channel.exchangeDeclare("exchange001","direct",true);
channel.queueBind("myqueue001","exchange001","routingkey001");
channel.basicConsume("myqueue001",true,"",new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message=new String(body,"utf-8");
System.out.println(message);
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
}
}
RabbitMq-fanout
发送方
public static void main(String[] args) {
String messageData = "fanout message, hello!";
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("47.110.157.82");
factory.setPort(5672);
factory.setUsername("liu");
factory.setPassword("123456");
Connection connection=null;
Channel channel=null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.exchangeDeclare("exchange002","fanout",true);
channel.basicPublish("exchange002","",null,messageData.getBytes());
System.out.println("消息发送成功");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
if(channel!=null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if(connection!=null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
接收方01
public static void main(String[] args) {
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("47.110.157.82");
factory.setPort(5672);
factory.setUsername("liu");
factory.setPassword("123456");
Connection connection=null;
Channel channel=null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
String queue = channel.queueDeclare().getQueue();
channel.exchangeDeclare("exchange002","fanout",true);
channel.queueBind(queue,"exchange002","");
channel.basicConsume(queue,true,"",new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message=new String(body,"utf-8");
System.out.println(“消费者01”+message);
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
}
}
接收方02
public static void main(String[] args) {
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("47.110.157.82");
factory.setPort(5672);
factory.setUsername("liu");
factory.setPassword("123456");
Connection connection=null;
Channel channel=null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
String queue = channel.queueDeclare().getQueue();
channel.exchangeDeclare("exchange002","fanout",true);
channel.queueBind(queue,"exchange002","");
channel.basicConsume(queue,true,"",new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message=new String(body,"utf-8");
System.out.println(“消费者02”+message);
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
}
}
RabbitMq-topic
消费者001
public static void main(String[] args) {
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("47.110.157.82");
factory.setPort(5672);
factory.setUsername("liu");
factory.setPassword("123456");
Connection connection=null;
Channel channel=null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare("topicmyqueue001",true,false,false,null);
channel.exchangeDeclare("topicexchange001","topic",true);
channel.queueBind("topicmyqueue001","topicexchange001","aa");
channel.basicConsume("topicmyqueue001",true,"",new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message=new String(body,"utf-8");
System.out.println("消费者001"+message);
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
}
}
消费者002
public static void main(String[] args) {
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("47.110.157.82");
factory.setPort(5672);
factory.setUsername("liu");
factory.setPassword("123456");
Connection connection=null;
Channel channel=null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare("topicmyqueue002",true,false,false,null);
channel.exchangeDeclare("topicexchange001","topic",true);
channel.queueBind("topicmyqueue002","topicexchange001","aa.*");
channel.basicConsume("topicmyqueue002",true,"",new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message=new String(body,"utf-8");
System.out.println("消费者002"+message);
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
}
}
消费者003
public static void main(String[] args) {
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("47.110.157.82");
factory.setPort(5672);
factory.setUsername("liu");
factory.setPassword("123456");
Connection connection=null;
Channel channel=null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare("topicmyqueue003",true,false,false,null);
channel.exchangeDeclare("topicexchange001","topic",true);
channel.queueBind("topicmyqueue003","topicexchange001","aa.#");
channel.basicConsume("topicmyqueue003",true,"",new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message=new String(body,"utf-8");
System.out.println("消费者003"+message);
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
}
}
发送者
public static void main(String[] args) {
String messageData = "topic message, hello!";
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("47.110.157.82");
factory.setPort(5672);
factory.setUsername("liu");
factory.setPassword("123456");
Connection connection=null;
Channel channel=null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.exchangeDeclare("topicexchange001","topic",true);
channel.basicPublish("topicexchange001","aa.bb.cc",null,messageData.getBytes());
System.out.println("消息发送成功");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
if(channel!=null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if(connection!=null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
事务
1.RabbitMq的事务只对发送方生效,接收者不论是否声明事务都会将队列中的消息移除。 2.事务声明后消息不会保存到队列,只有事务提交以后消息才会进入队列中。 txSelect()声明事务 txCommit();提交事务 txRollback();回滚事务
public static void main(String[] args) {
String messageData = "direct message, hello!";
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("47.110.157.82");
factory.setPort(5672);
factory.setUsername("liu");
factory.setPassword("123456");
Connection connection=null;
Channel channel=null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare("myqueue001",true,false,false,null);
channel.exchangeDeclare("exchange001","direct",true);
channel.queueBind("myqueue001","exchange001","routingkey001");
channel.txSelect();
channel.basicPublish("exchange001","routingkey001",null,messageData.getBytes());
channel.txCommit();
System.out.println("消息发送成功");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
if(channel!=null){
try {
channel.txRollback();
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if(connection!=null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
发送者确认模式
普通确认
public static void main(String[] args) {
String messageData = "direct message, hello!";
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("47.110.157.82");
factory.setPort(5672);
factory.setUsername("liu");
factory.setPassword("123456");
Connection connection=null;
Channel channel=null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare("myqueue001",true,false,false,null);
channel.exchangeDeclare("exchange001","direct",true);
channel.queueBind("myqueue001","exchange001","routingkey001");
channel.confirmSelect();
channel.basicPublish("exchange001","routingkey001",null,messageData.getBytes());
boolean b = channel.waitForConfirms();
System.out.println("消息发送成功");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if(channel!=null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if(connection!=null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
批量确认
public static void main(String[] args) {
String messageData = "direct message, hello!";
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("47.110.157.82");
factory.setPort(5672);
factory.setUsername("liu");
factory.setPassword("123456");
Connection connection=null;
Channel channel=null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare("myqueue001",true,false,false,null);
channel.exchangeDeclare("exchange001","direct",true);
channel.queueBind("myqueue001","exchange001","routingkey001");
channel.confirmSelect();
channel.basicPublish("exchange001","routingkey001",null,messageData.getBytes());
channel.waitForConfirmsOrDie();
System.out.println("消息发送成功");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if(channel!=null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if(connection!=null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
异步确认
public static void main(String[] args) {
String messageData = "direct message, hello!";
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("47.110.157.82");
factory.setPort(5672);
factory.setUsername("liu");
factory.setPassword("123456");
Connection connection=null;
Channel channel=null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare("myqueue001",true,false,false,null);
channel.exchangeDeclare("exchange001","direct",true);
channel.queueBind("myqueue001","exchange001","routingkey001");
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long l, boolean b) throws IOException {
System.out.println("消息被确认-----当前消息编号: "+l+" 是否确认多个:"+b);
}
@Override
public void handleNack(long l, boolean b) throws IOException {
System.out.println("消息没有被确认-----当前消息编号: "+l+" 是否没有确认多个:"+b);
}
});
for (int i=0; i<10000;i++){
channel.basicPublish("exchange001","routingkey001",null,messageData.getBytes());
}
System.out.println("消息发送成功");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
if(channel!=null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if(connection!=null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}

接收方消息确认
1.如果消费者声明了事务,必须提交事务消息才会被移除队列 2.isRedeliver()消息是否被接收过
public static void main(String[] args) {
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("47.110.157.82");
factory.setPort(5672);
factory.setUsername("liu");
factory.setPassword("123456");
Connection connection=null;
Channel channel=null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare("myqueue001",true,false,false,null);
channel.exchangeDeclare("exchange001","direct",true);
channel.queueBind("myqueue001","exchange001","routingkey001");
channel.basicConsume("myqueue001",false,"",new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
boolean redeliver = envelope.isRedeliver();
if(!redeliver){
long deliveryTag = envelope.getDeliveryTag();
Channel channel1 = this.getChannel();
String message=new String(body,"utf-8");
channel1.basicAck(deliveryTag,true);
System.out.println(message);
}else{
}
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
springboot整合RabbitMq
direct方式
发送端 1.添加pom依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.application.yml配置
server:
port: 8091
spring:
application:
name: rabbitmq-provider
rabbitmq:
host: 47.110.157.82
port: 5672
username: liu
password: 123456
3.RabbitMQ配置类DirectRabbitConfig
@Configuration
public class DirectRabbitConfig {
@Bean
public Queue directQueue() {
return new Queue("directQueue",true);
}
@Bean
DirectExchange directExchange() {
return new DirectExchange("directExchange");
}
@Bean
Binding bindingDirect(Queue directQueue,DirectExchange directExchange) {
return BindingBuilder.bind(directQueue).to(directExchange).with("routingkey001");
}
}
4.发送消息
@Controller
public class SendController {
@Autowired
RabbitTemplate rabbitTemplate;
@RequestMapping("sendMessage")
@ResponseBody
public String sendMessage(String message){
String messageId = String.valueOf(UUID.randomUUID());
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String,Object> map=new HashMap<>();
map.put("messageId",messageId);
map.put("messageData",message);
map.put("createTime",createTime);
rabbitTemplate.convertAndSend("directExchange", "routingkey001", map);
return "ok";
}
}
接收端 1.添加pom.xml依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.application.yml配置
server:
port: 8092
spring:
application:
name: rabbitmq-consumer
rabbitmq:
host: 47.110.157.82
port: 5672
username: liu
password: 123456
3.为了防止队列不存在报错,这里我们也添加RabbitMQ配置类DirectRabbitConfig
@Configuration
public class DirectRabbitConfig {
@Bean
public Queue directQueue() {
return new Queue("directQueue",true);
}
@Bean
DirectExchange directExchange() {
return new DirectExchange("directExchange");
}
@Bean
Binding bindingDirect(Queue directQueue,DirectExchange directExchange) {
return BindingBuilder.bind(directQueue).to(directExchange).with("routingkey001");
}
}
4.接收消息
@Component
@RabbitListener(queues = "directQueue")
public class ReceiveMessage {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("准备接收消息了");
System.out.println("directReceiver消费者收到消息 : " + testMessage.toString());
}
}
fanout方式
发送端 1.添加pom依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.application.yml配置
server:
port: 8091
spring:
application:
name: rabbitmq-provider
rabbitmq:
host: 47.110.157.82
port: 5672
username: liu
password: 123456
3.RabbitMQ配置类FanoutRabbitConfig
@Configuration
public class FanoutRabbitConfig {
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
}
4.发送消息
@Controller
public class SendController {
@Autowired
RabbitTemplate rabbitTemplate;
@RequestMapping("sendMessage")
@ResponseBody
public String sendMessage(String message){
String messageId = String.valueOf(UUID.randomUUID());
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String,Object> map=new HashMap<>();
map.put("messageId",messageId);
map.put("messageData",message);
map.put("createTime",createTime);
rabbitTemplate.convertAndSend("fanoutExchange", "", map);
return "ok";
}
}
接收端 1.添加pom依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.application.yml配置
server:
port: 8092
spring:
application:
name: rabbitmq-consumer
rabbitmq:
host: 47.110.157.82
port: 5672
username: liu
password: 123456
3.RabbitMQ配置类FanoutRabbitConfig
@Configuration
public class FanoutRabbitConfig {
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
}
4.接收消息
@Component
public class FanoutReceive {
@RabbitListener(bindings = {
@QueueBinding(value =@Queue(),
exchange = @Exchange(name = "fanoutExchange",type = "fanout"))
})
public void fanoutReceive(Map messgae){
System.out.println("fanoutReceiver01消费者收到消息"+messgae.toString());
}
@RabbitListener(bindings = {
@QueueBinding(value =@Queue(),
exchange = @Exchange(name = "fanoutExchange",type = "fanout"))
})
public void fanoutReceive02(Map messgae){
System.out.println("fanoutReceiver02消费者收到消息"+messgae.toString());
}
}
topic方式
发送端 1.添加pom依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.application.yml配置
server:
port: 8091
spring:
application:
name: rabbitmq-provider
rabbitmq:
host: 47.110.157.82
port: 5672
username: liu
password: 123456
3.RabbitMQ配置类FanoutRabbitConfig
@Configuration
public class TopicRabbitConfig {
@Bean
TopicExchange topicExchange(){
return new TopicExchange("topicExchange");
}
}
4.发送消息
@Autowired
RabbitTemplate rabbitTemplate;
@RequestMapping("sendMessage")
@ResponseBody
public String sendMessage(String message){
String messageId = String.valueOf(UUID.randomUUID());
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String,Object> map=new HashMap<>();
map.put("messageId",messageId);
map.put("messageData",message);
map.put("createTime",createTime);
rabbitTemplate.convertAndSend("topicExchange", "aa.bb", map);
return "ok";
}
}
接收端 1.添加pom依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.application.yml配置
server:
port: 8092
spring:
application:
name: rabbitmq-consumer
rabbitmq:
host: 47.110.157.82
port: 5672
username: liu
password: 123456
3.接收消息
@Component
public class TopicReceive {
@RabbitListener(bindings = {
@QueueBinding(value =@Queue("topicQueue001"),
key = {"aa"},
exchange = @Exchange(name = "topicExchange",type = "topic"))
})
public void topicReceive01(Map messgae){
System.out.println("topicReceive01消费者收到消息"+messgae.toString());
}
@RabbitListener(bindings = {
@QueueBinding(value =@Queue("topicQueue002"),
key = {"aa.*"},
exchange = @Exchange(name = "topicExchange",type = "topic"))
})
public void topicReceive02(Map messgae){
System.out.println("topicReceive02消费者收到消息"+messgae.toString());
}
@RabbitListener(bindings = {
@QueueBinding(value =@Queue("topicQueue003"),
key = {"aa.#"},
exchange = @Exchange(name = "topicExchange",type = "topic"))
})
public void topicReceive03(Map messgae){
System.out.println("topicReceive03消费者收到消息"+messgae.toString());
}
}

RabbitMQ集群搭建
我的服务器一:192.168.164.134服务器二:192.168.164.137 1.修改host文件 修改192.168.164.134 hosts文件
vim /etc/hosts
 修改192.168.164.137 hosts文件  重启服务器,确保两个服务器之间互通  
2.准备两个服务器分别安装RabbitMQ RabbitMQ下载安装 保证两个RabbitMQ成功安装   3.将两个服务器.erlang.cookie保持一直 查看192.168.164.134的.erlang.cookie,将内容复制
cat /var/lib/rabbitmq/.erlang.cookie
 将192.168.164.134的.erlang.cookie复制到192.168.164.137
scp /var/lib/rabbitmq/.erlang.cookie 192.168.164.137:/var/lib/rabbitmq/
重启两个服务器的RabbitMQ
/sbin/service rabbitmq-server restart
4.集群配置 192.168.164.134
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@rabbitmq2
rabbitmqctl start_app
192.168.164.137
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@rabbitmq1
rabbitmqctl start_app
  5.镜像集群配置 添加规则 
|