一、消息队列
1.概述
消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ
2.应用场景
以下介绍消息队列在实际应用中常用的异步处理、应用解耦、流量削峰、日志处理等场景。
2.1 异步处理
场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种
- 串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。
 - 并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间
 假设三个业务节点每个使用50毫秒钟,不考虑网络等其他开销,则串行方式的时间是150毫秒,并行的时间可能是100毫秒。 因为CPU在单位时间内处理的请求数是一定的,假设CPU1秒内吞吐量是100次。则串行方式1秒内CPU可处理的请求量是7次(1000/150)。并行方式处理的请求量是10次(1000/100) 小结:如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?
引入消息队列,将不是必须的业务逻辑进行异步处理。改造后的架构如下: 
按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍。
2.2 应用解耦
场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。如下图: 
传统模式的缺点:假如库存系统无法访问,则订单减库存将失败,从而导致订单失败,订单系统与库存系统耦合
如何解决以上问题呢?引入应用消息队列后的方案,如下图: 
订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功 库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作 假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦
2.3 流量削锋
流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。 应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。 a、可以控制活动的人数 b、可以缓解短时间内高流量压垮应用  用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。 秒杀业务根据消息队列中的请求信息,再做后续处理
2.4 日志处理
2.4日志处理 日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。架构简化如下  日志采集客户端,负责日志数据采集,定时写受写入Kafka队列 Kafka消息队列,负责日志数据的接收,存储和转发 日志处理应用:订阅并消费kafka队列中的日志数据
3.JMS消息服务
JMS(JAVA Message Service,java消息服务)API是一个消息服务的标准/规范,允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。
3.1 消息模型
在JMS标准中,有两种消息模型P2P(Point to Point),Publish/Subscribe(Pub/Sub)。
P2P模式
-
P2P模式包含三个角色:消息队列(Queue),发送者(Sender),接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。 -
每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中) -
发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列 -
接收者在成功接收消息之后需向队列应答成功 -
如果希望发送的每个消息都会被成功处理的话,那么需要P2P模式。
Pub/Sub模式
-
包含三个角色主题(Topic),发布者(Publisher),订阅者(Subscriber) 多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。 -
每个消息可以有多个消费者 -
发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息 -
为了消费消息,订阅者必须保持运行的状态 -
为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。 -
如果希望发送的消息可以不被做任何处理、或者只被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型。
3.2 消息消费
在JMS中,消息的产生和消费都是异步的。对于消费来说,JMS的消息者可以通过两种方式来消费消息。 (1)同步 订阅者或接收者通过receive方法来接收消息,receive方法在接收到消息之前(或超时之前)将一直阻塞;
(2)异步 订阅者或接收者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage方法。
二、Kafka
1.概述
Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性:
- 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。(文件追加的方式写入数据,过期的数据定期删除)
- 高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息
- 支持通过Kafka服务器和消费机集群来分区消息
- 支持Hadoop并行数据加载
- 一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用。
2.Kafka相关概念
- Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker - Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处) - Partition
Parition是物理上的概念,每个Topic包含一个或多个Partition. - Producer
负责发布消息到Kafka broker - Consumer
消息消费者,向Kafka broker读取消息的客户端。 - Consumer Group
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
具体流程如下图: 
三、Docker环境下Zookeeper集群环境搭建
1.概述
ZooKeeper 是一个开源的分布式协调服务,是 Hadoop,HBase 和其他分布式框架使用的有组织服务的标准。
分布式应用程序可以基于 ZooKeeper 实现诸如数据发布/订阅、负载均衡、命名服务、分布式协调/通知、集群管理、Master 选举、分布式锁和分布式队列等功能。
2.Docker 配置
在/mydata/zookeeper路径下创建docker-compose-zookeeper-cluster.yml文件
version: '3.3'
networks:
docker_net:
external: true
services:
zoo1:
image: zookeeper
restart: unless-stopped
hostname: zoo1
container_name: zoo1
ports:
- 2182:2181
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181
volumes:
- ./zoo1/data:/data
- ./zoo1/datalog:/datalog
networks:
- docker_net
zoo2:
image: zookeeper
restart: unless-stopped
hostname: zoo2
container_name: zoo2
ports:
- 2183:2181
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zoo3:2888:3888;2181
volumes:
- ./zoo2/data:/data
- ./zoo2/datalog:/datalog
networks:
- docker_net
zoo3:
image: zookeeper
restart: unless-stopped
hostname: zoo3
container_name: zoo3
ports:
- 2184:2181
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181
volumes:
- ./zoo3/data:/data
- ./zoo3/datalog:/datalog
networks:
- docker_net
3.启动集群
docker-compose -f docker-compose-zookeeper-cluster.yml up -d

4.查看信息
1.分别查看各个节点的角色  由上结果可知,zoo2 是 leader。负责集群的读写。
2.查看 zoo2 选举数据 
5.选举演练
1.模拟 Leader 掉线,发现zoo3已选举为Leader。  2. zoo2 节点重新上线,恢复为follow节点 
6.常用操作
- 查看文件目录
ls / - 创建顺序节点
create -s /zk-test 123 - 创建临时节点
create -e /zk-temp 123 当会话过期或客户端断开连接时,临时节点将被自动删除 - 创建永久节点
create /zk-permanent 123 - 读取节点
get / - 更新节点
set /zk-permanent 456 - 检查节点状态
stat /zk-permanent - 删除节点
rmr /zk-permanent
四、Docker环境下Kafka集群环境搭建
1.在/mydata/kafka目录下创建docker-compose-kafka-cluster.yml文件
version: '3.3'
networks:
docker_net:
external: true
services:
kafka1:
image: wurstmeister/kafka
restart: unless-stopped
container_name: kafka1
ports:
- "9093:9092"
external_links:
- zoo1
- zoo2
- zoo3
environment:
KAFKA_BROKER_ID: 1
KAFKA_ADVERTISED_HOST_NAME: 47.94.93.93 ## 修改:宿主机IP
KAFKA_ADVERTISED_PORT: 9093 ## 修改:宿主机映射port
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://47.94.93.93:9093 ## 绑定发布订阅的端口。修改:宿主机IP
KAFKA_ZOOKEEPER_CONNECT: "47.94.93.93:2182,47.94.93.93:2183,47.94.93.93:2184"
volumes:
- "./kafka1/docker.sock:/var/run/docker.sock"
- "./kafka1/data/:/kafka"
networks:
- docker_net
kafka2:
image: wurstmeister/kafka
restart: unless-stopped
container_name: kafka2
ports:
- "9094:9092"
external_links:
- zoo1
- zoo2
- zoo3
environment:
KAFKA_BROKER_ID: 2
KAFKA_ADVERTISED_HOST_NAME: 47.94.93.93 ## 修改:宿主机IP
KAFKA_ADVERTISED_PORT: 9094 ## 修改:宿主机映射port
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://47.94.93.93:9094 ## 修改:宿主机IP
KAFKA_ZOOKEEPER_CONNECT: "47.94.93.93:2182,47.94.93.93:2183,47.94.93.93:2184"
volumes:
- "./kafka2/docker.sock:/var/run/docker.sock"
- "./kafka2/data/:/kafka"
networks:
- docker_net
kafka3:
image: wurstmeister/kafka
restart: unless-stopped
container_name: kafka3
ports:
- "9095:9092"
external_links:
- zoo1
- zoo2
- zoo3
environment:
KAFKA_BROKER_ID: 3
KAFKA_ADVERTISED_HOST_NAME: 47.94.93.93 ## 修改:宿主机IP
KAFKA_ADVERTISED_PORT: 9095 ## 修改:宿主机映射port
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://47.94.93.93:9095 ## 修改:宿主机IP
KAFKA_ZOOKEEPER_CONNECT: "47.94.93.93:2182,47.94.93.93:2183,47.94.93.93:2184"
volumes:
- "./kafka3/docker.sock:/var/run/docker.sock"
- "./kafka3/data/:/kafka"
networks:
- docker_net
kafka-manager:
image: sheepkiller/kafka-manager:latest
restart: unless-stopped
container_name: kafka-manager
hostname: kafka-manager
ports:
- "9000:9000"
links: # 连接本compose文件创建的container
- kafka1
- kafka2
- kafka3
external_links: # 连接本compose文件以外的container
- zoo1
- zoo2
- zoo3
environment:
ZK_HOSTS: 47.94.93.93:2182,47.94.93.93:2183,47.94.93.93:2184 ## 修改:宿主机IP
TZ: CST-8
networks:
- docker_net
启动kafka集群
docker-compose -f docker-compose-kafka-cluster.yml up -d

2.在上面启动时,也启动了可视化管理模块kafka-manager,因为 kafka 的元数据、配置信息由 Zookeeper 管理,这里我们在 UI 页面做下相关配置。
- 访问 http:47.94.93.93:9000,按图示添加相关配置
 2.配置后我们可以看到默认有一个 topic(__consumer_offsets),3 个 brokers。该 topic 分 50 个 partition,用于记录 kafka 的消费偏移量。(其他topic都为后续生成的) 
|