IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ使用(二) -> 正文阅读

[大数据]RabbitMQ使用(二)

RabbitMQ进阶(二)

消息确认机制

RabbitMQ提供了transaction、confirm两种消息确认机制。transaction即事务机制,手动提交和回滚;confirm机制提供了Confirmlistener和waitForConfirms两种方式。confirm机制效率明显会高于transaction,但是后者的优势在于强一致性。
1.消息的确认机制只是确认publisher发送消息到broker,由broker进行应答,不能确认消息是否有效消费。
2. 为了确认消息是否发送到queue,应该在发送消息中启用参数mandatory=true,使用ReturnListerner接受未被发送成功的消息
3. 接下来需要确认消息是否被有效消费,publisher未提供监听事件,但是提供了应答机制来保证消息被成功消费,应答方式:
Channel.basicAck(用于肯定确认)
RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
Channel.basicNack(用于否定确认)
Channel.basicReject(用于否定确认)
与 Channel.basicNack 相比少一个参数,不处理该消息了直接拒绝,可以将其丢弃了
basicRecover:消息重新进入队列,requeue,发给新的consummer,false发送给相同的consumer

同步

public class TransactionalSend {
    private final static String EXCHANGE_NAME = "publisherconfirm-exchange";

    public static void execute(String host, String userName, String password, String routingKey, int num) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setUsername(userName);
        factory.setPassword(password);
        Connection connection = null;
        Channel channel = null;

        try {
            //建立TCP连接
            connection = factory.newConnection();
            //开启信道
            channel = connection.createChannel();
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            String message = "TransactionSend!" + System.currentTimeMillis();
            try {
                //开启事务
                channel.txSelect();
                while (num-- > 0) {
                    channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
                    System.out.println("sent+[" + num + "]'" + message + "'");
                }
                channel.txCommit();
            } catch (IOException e) {
                e.printStackTrace();
                channel.txRollback();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            try {
                channel.close();
                connection.close();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }


    }

    public static void main(String[] args) {
        //测试线程池
        ExecutorService executorService= Executors.newFixedThreadPool(10);
        String rabbitmq_host="";
        String rabbitmq_user="moer";
        String rabbitmq_pwd="123456";
        String routingkey="publisher-confirm";

        executorService.submit(()->{
           TransactionalSend.execute(rabbitmq_host,rabbitmq_user,rabbitmq_pwd,routingkey,1);
        });
    }
}

异步处理

public class AsynConfirmSend {
   protected final static String EXCHANGE_NAME = "publisherconfirm-exchange";

    public static void execute(String host, String userName, String password, String routingKey, int num) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setUsername(userName);
        factory.setPassword(password);
        Connection connection = null;
        Channel channel = null;

        try {
            //建立TCP连接
            connection = factory.newConnection();
            //开启信道
            channel = connection.createChannel();
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            String message = "AsynSend!" + System.currentTimeMillis();

            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleAck(long l, boolean b) throws IOException {
                    System.out.println("[Asyn]handleAck:deliveryTag="+l+"multiple"+b);
                }

                @Override
                public void handleNack(long l, boolean b) throws IOException {
                    System.out.println("[Asyn]handleNAck:deliveryTag="+l+"multiple"+b);

                }
            });
            channel.confirmSelect();

                while (num-- > 0) {
                    channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
                    System.out.println("sent+[" + num + "]'" + message + "'");
                }
               Thread.sleep(1*1000);
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            try {
                channel.close();
                connection.close();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }


    }

    public static void main(String[] args) {
        //测试线程池
        ExecutorService executorService= Executors.newFixedThreadPool(10);
        String rabbitmq_host="";
        String rabbitmq_user="moer";
        String rabbitmq_pwd="123456";
        String routingkey="publisher-confirm2";

        executorService.submit(()->{
           AsynConfirmSend.execute(rabbitmq_host,rabbitmq_user,rabbitmq_pwd,routingkey,10);
        });
    }
}

异步核心代码标注

            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleAck(long l, boolean b) throws IOException {
                    System.out.println("[Asyn]handleAck:deliveryTag="+l+"multiple"+b);
                }

                @Override
                public void handleNack(long l, boolean b) throws IOException {
                    System.out.println("[Asyn]handleNAck:deliveryTag="+l+"multiple"+b);

                }
            });
            channel.confirmSelect();

异步消息确认

public class ConsumerConfiremRecv {
    private  final static String QUEUE_NAME="consumerconfirm";
    private final static String EXCHANGE_NAME=AsynConfirmSend.EXCHANGE_NAME;

    public static void execute(String host, String userName, String password, String routingKey) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setUsername(userName);
        factory.setPassword(password);
        Connection connection = null;

        try {
            //建立TCP连接
            connection = factory.newConnection();
            //在TCP连接的基础上创建一个信道
            final Channel channel = connection.createChannel();
            //声明一个持久话队列
            //queueDeclare(名字,是否持久化,独占的queue, 不使用时是否自动删除,其他参数)
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);
            //绑定路由,同一个队列可以绑定多个值
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,routingKey);
            System.out.println("[consumerconfirmRecv]waiting for messages.");
            Consumer consumer=new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message=new String(body,"UTF-8");
                    System.out.println("[consumerconfirmRecv]Received"+message);
                    //正向确认
                    channel.basicAck(envelope.getDeliveryTag(),true);
                    //消息否定确认
                    //channel.basicNack(envelope.getDeliveryTag(),true,false);
                }
            };
            //接受消息,设置非自动确认
           channel.basicConsume(QUEUE_NAME,true,consumer);
           Thread.sleep(5*1000);
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            try {
                connection.close();
            }catch (IOException e) {
                e.printStackTrace();
            }
        }


    }
    public static void main(String[] args) {
        //测试线程池
        ExecutorService executorService= Executors.newFixedThreadPool(10);
        String rabbitmq_host="";
        String rabbitmq_user="moer";
        String rabbitmq_pwd="123456";
        String routingkey="publisher-confirm2";

        executorService.submit(()->{
            ConsumerConfiremRecv.execute(rabbitmq_host,rabbitmq_user,rabbitmq_pwd,routingkey);
        });
    }
}

如果你有更多的案列需要实现,可以将rabbitmq的相关参数抽离成一个工具类,直接调用即可。

public class RabbitMqUtils {
    //得到一个连接的 channel
    public static Channel getChannel() throws Exception {
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("");
        factory.setUsername("moer");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}
Channel channel = RabbitMqUtils.getChannel();

希望大家可以和我一起进步,提升自己永远没有错!

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-15 16:15:11  更:2021-07-15 16:15:24 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/21 0:36:12-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码