Work Queue
- Work Queue
工作队列,入门时候的queue仅是一个消费者消费生产者提供的信息,如下图  但当生产者开始大量推送消息,而消费者的处理速度跟不上的时候,我们就需要多个消费者来进行消费了。因此出现了Work Queue。  - 生产者代码
package com.mqTest.WorrkQueue;
import com.mqTest.utils.MqUtils;
import com.rabbitmq.client.Channel;
import org.apache.log4j.Logger;
public class Producer {
private static final String QUEUE_NAME = "hello_work";
private static Logger log = Logger.getLogger(Producer.class);
public static void main(String[] args) {
try {
Channel channel = MqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME, false, false, true, null);
for (int i = 0; i < 10; i++) {
String message ="第" + i + "条消息推送";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
log.info("message to other ok");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
3.消费者1/2 1和2的代码相同,只是消费者名字不一样,在此只提供消费者1代码。
package com.mqTest.WorrkQueue;
import com.mqTest.utils.MqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import org.apache.log4j.Logger;
public class Worker {
private static final String QUEUE_NAME = "hello_work";
private static Logger log = Logger.getLogger(Worker.class);
public static void main(String[] args) {
Channel client = MqUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag, message) -> {
log.info("queue:" + consumerTag +", message:" + new String(message.getBody()));
};
CancelCallback cancelCallback = (consumerTag) -> {
log.info(consumerTag + "is delete");
};
System.out.println("this is Worker1");
try {
client.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
} catch (Exception e) {
e.printStackTrace();
}
}
}
- 测试结果
启动生产者,和2个消费者。预期结果是轮流获取消息。

 
成功!
|