RabbitMQ进阶(二)
消息确认机制
RabbitMQ提供了transaction、confirm两种消息确认机制。transaction即事务机制,手动提交和回滚;confirm机制提供了Confirmlistener和waitForConfirms两种方式。confirm机制效率明显会高于transaction,但是后者的优势在于强一致性。 1.消息的确认机制只是确认publisher发送消息到broker,由broker进行应答,不能确认消息是否有效消费。 2. 为了确认消息是否发送到queue,应该在发送消息中启用参数mandatory=true,使用ReturnListerner接受未被发送成功的消息 3. 接下来需要确认消息是否被有效消费,publisher未提供监听事件,但是提供了应答机制来保证消息被成功消费,应答方式: Channel.basicAck(用于肯定确认) RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了 Channel.basicNack(用于否定确认) Channel.basicReject(用于否定确认) 与 Channel.basicNack 相比少一个参数,不处理该消息了直接拒绝,可以将其丢弃了 basicRecover:消息重新进入队列,requeue,发给新的consummer,false发送给相同的consumer
同步
public class TransactionalSend {
private final static String EXCHANGE_NAME = "publisherconfirm-exchange";
public static void execute(String host, String userName, String password, String routingKey, int num) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setUsername(userName);
factory.setPassword(password);
Connection connection = null;
Channel channel = null;
try {
//建立TCP连接
connection = factory.newConnection();
//开启信道
channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String message = "TransactionSend!" + System.currentTimeMillis();
try {
//开启事务
channel.txSelect();
while (num-- > 0) {
channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
System.out.println("sent+[" + num + "]'" + message + "'");
}
channel.txCommit();
} catch (IOException e) {
e.printStackTrace();
channel.txRollback();
}
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
channel.close();
connection.close();
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
//测试线程池
ExecutorService executorService= Executors.newFixedThreadPool(10);
String rabbitmq_host="";
String rabbitmq_user="moer";
String rabbitmq_pwd="123456";
String routingkey="publisher-confirm";
executorService.submit(()->{
TransactionalSend.execute(rabbitmq_host,rabbitmq_user,rabbitmq_pwd,routingkey,1);
});
}
}
异步处理
public class AsynConfirmSend {
protected final static String EXCHANGE_NAME = "publisherconfirm-exchange";
public static void execute(String host, String userName, String password, String routingKey, int num) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setUsername(userName);
factory.setPassword(password);
Connection connection = null;
Channel channel = null;
try {
//建立TCP连接
connection = factory.newConnection();
//开启信道
channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String message = "AsynSend!" + System.currentTimeMillis();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long l, boolean b) throws IOException {
System.out.println("[Asyn]handleAck:deliveryTag="+l+"multiple"+b);
}
@Override
public void handleNack(long l, boolean b) throws IOException {
System.out.println("[Asyn]handleNAck:deliveryTag="+l+"multiple"+b);
}
});
channel.confirmSelect();
while (num-- > 0) {
channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
System.out.println("sent+[" + num + "]'" + message + "'");
}
Thread.sleep(1*1000);
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
channel.close();
connection.close();
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
//测试线程池
ExecutorService executorService= Executors.newFixedThreadPool(10);
String rabbitmq_host="";
String rabbitmq_user="moer";
String rabbitmq_pwd="123456";
String routingkey="publisher-confirm2";
executorService.submit(()->{
AsynConfirmSend.execute(rabbitmq_host,rabbitmq_user,rabbitmq_pwd,routingkey,10);
});
}
}
异步核心代码标注
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long l, boolean b) throws IOException {
System.out.println("[Asyn]handleAck:deliveryTag="+l+"multiple"+b);
}
@Override
public void handleNack(long l, boolean b) throws IOException {
System.out.println("[Asyn]handleNAck:deliveryTag="+l+"multiple"+b);
}
});
channel.confirmSelect();
异步消息确认
public class ConsumerConfiremRecv {
private final static String QUEUE_NAME="consumerconfirm";
private final static String EXCHANGE_NAME=AsynConfirmSend.EXCHANGE_NAME;
public static void execute(String host, String userName, String password, String routingKey) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setUsername(userName);
factory.setPassword(password);
Connection connection = null;
try {
//建立TCP连接
connection = factory.newConnection();
//在TCP连接的基础上创建一个信道
final Channel channel = connection.createChannel();
//声明一个持久话队列
//queueDeclare(名字,是否持久化,独占的queue, 不使用时是否自动删除,其他参数)
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
//绑定路由,同一个队列可以绑定多个值
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,routingKey);
System.out.println("[consumerconfirmRecv]waiting for messages.");
Consumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message=new String(body,"UTF-8");
System.out.println("[consumerconfirmRecv]Received"+message);
//正向确认
channel.basicAck(envelope.getDeliveryTag(),true);
//消息否定确认
//channel.basicNack(envelope.getDeliveryTag(),true,false);
}
};
//接受消息,设置非自动确认
channel.basicConsume(QUEUE_NAME,true,consumer);
Thread.sleep(5*1000);
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
connection.close();
}catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
//测试线程池
ExecutorService executorService= Executors.newFixedThreadPool(10);
String rabbitmq_host="";
String rabbitmq_user="moer";
String rabbitmq_pwd="123456";
String routingkey="publisher-confirm2";
executorService.submit(()->{
ConsumerConfiremRecv.execute(rabbitmq_host,rabbitmq_user,rabbitmq_pwd,routingkey);
});
}
}
如果你有更多的案列需要实现,可以将rabbitmq的相关参数抽离成一个工具类,直接调用即可。
public class RabbitMqUtils {
//得到一个连接的 channel
public static Channel getChannel() throws Exception {
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("");
factory.setUsername("moer");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
Channel channel = RabbitMqUtils.getChannel();
希望大家可以和我一起进步,提升自己永远没有错!
|