IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> kafka和zookeeper集群搭建 -> 正文阅读

[大数据]kafka和zookeeper集群搭建

前提

我这里用的是三台服务器搭建集群,不是一台,这个要搞清楚

zookeeper集群搭建

1. 下载安装包

可以直接到官网下载,也可以通过下面命令下载,将命令中的版本号改成自己的想要下载的版本号就可以,如果不能,那就乖乖去官网下载

wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz

接下来就是解压了

 tar -zxvd apache-zookeeper-3.7.0-bin.tar.gz 

解压完文件就行行了

2. 配置域名,在/etc/hosts中配置

这一步不是必须的,你也可以直接在后面的配置文件中配置ip就行
贴一下我的hosts;

注意:配置的这些域名信息要同步到三台服务器中,我这里搭建集群用了三台服务器,至于用一台的,就不用考虑了。我这里连kafka的域名也配置了,等下kafak环节就不讲了

至于同步的方法,你可以拷贝,每个配置,也可以用下面命令,如果中间要输入密码,就输入

scp -r /etc/host 服务器1ip:/etc/hosts
scp -r /etc/host 服务器2ip:/etc/hosts
# zookeeper配置

192.168.1.214 zook1.ligel.com
192.168.1.88 zook2.ligel.com
192.168.1.51 zook3.ligel.com

# kafka配置

192.168.1.214 kafka1.ligel.com
192.168.1.88 kafka2.ligel.com
192.168.1.51 kafka3.ligel.com

3. 编写配置文件

进入到解压好的zookeeper中,进入conf文件下。 apache-zookeeper-3.7.0-bin/conf

执行cp zoo_sample.cfg ./zoo.cfg, 然后编辑zoo.cfg

配置文件内容如下
可以直接用我的配置文件,其中该三个地方

  • dataDir
  • dataLogDir
  • server.x=[hostname]:nnnnn[:nnnnn][:observer] 这里是服务器节点配置

上面第一个是数据存储路径,挂载成你自己的路径
第二个是日志文件挂载路径,一样挂载成自己的路径

第三个就是节点信息了。有多少个节点就配置多少个
这里有几个注意的点

  • server.x 这里的x就是在上面配置的dataDir要新建一个myid文件,文件中添上当前服务的id,如果server.1 那么myid文件中就一个数字1就行。记住每个服务器都要配置。就是x和myid文件中填的数组一定要对应,不然启动不了

  • 其次就是server.x=后面的服务器地址,你可以写域名(前提是你在/etc/hosts配置了域名),也可以写ip,至于后面两个端口,固定,不用管

# The number of milliseconds of each tick
# zk使用的基本时间单位是tick,这个参数用于配置一个tick的长度,单位为毫秒,默认值3000,不建议修改.
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
# zk用于保存内存数据库的快照的目录,除非设置了 dataLogDir,否则这个目录也用来保存更新数据库的事务日志。在生产环境使用的zk集群,强烈建议设置dataLogDir,让dataDir只存放快照,因为写快# 照的开销很低,这样dataDir就可以和其他日志目录的挂载点合设.
dataDir=/mydata/zkcluster/zook/data
# the port at which the clients will connect
# zk服务器监听的端口,客户端通过该端口建立连接,每台 zk服务器也允许设置为不同的值。默认值2181
clientPort=2181

# 生产环境必须要设,注意,因为zk写事务日志是顺序地、 阻塞地,所以这个目录一定要单独划盘,禁止和其他高写操作的目录合设,否则严重影响性能。
dataLogDir=/mydata/zkcluster/zook/datalog

# 因为zk服务器是需要对请求队列化的,当客户端很多,提交的写操作很频繁时,zk服务器可能在来得及处理前就OOM了。这个参数用于限制系统中还未处理的请求数阈值,默认为1000,一般不需要修改
globalOutstandingLimit=1000

# 用于设置预分配的事务日志文件大小,单位为KB,默认值为64M,这个要改,默认的太大了
preAllocSize=10240

# 用于设置每次快照之间的事务数,默认为100000。因为写快照是消耗性能的,也没有必要频繁写,虽然该值看起来很大,一般不需要修改该值。
snapCount=100000
# 如果设置了该路径,将持续跟踪zk的操作并写入一个名为traceFile.year.month.day的跟踪日志中,该选项比较消耗性能,只在debug环境可以开启,生产环境不能开。 
# traceFile

# the maximum number of client connections.
# increase this if you need to handle more clients
# 单个客户端与单台服务器之间的连接数的限制,是ip级别的,默认是60,如果设置为0,那么表明不作任何限制。请注意这个限制的使用范围,仅仅是单台客户端机器与单台zk服务器之间的连接数限制,## 不是针对指定客户端IP,也不是zk集群的连接数限制,也不是单台zk对所有客户端的连接数限制。如果点对点连接数不高,建议改小这个值。 
#maxClientCnxns=60


# 用于配置zk的端口监听在本机哪个IP上生效,用 于服务器有多块网卡的情况,尤其是既有外网地址又有内网地址的集群, 只要开放内网地址即可。
# clientPortAddress

# 客户端和zk服务器间的最小超时时间,单位为 tickTime的倍数,默认为2
minSessionTimeout=2


# 客户端和zk服务器间的最大超时时间,单位为 tickTime的倍数,默认为20。客户端一般有自己的连接管理参数,如果客户端的参数不在minSessionTimeout--maxSessionTimeout这个范围内, 会被强制设# 为最接近的那个值。
maxSessionTimeout=20

# 用于设置触发警告的存储同步时间阈值, 单位为毫秒,默认为1000,因为只是一个警告而已,无需修改。
fsync.warningthresholdms=1000



# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
# 3.4.0及之后版本zk提供了自动清理快照 文件和事务日志文件的功能,该参数指定了保留文件的个数,默认为3, 一般无需修改。
autopurge.snapRetainCount=3

# Purge task interval in hours
# Set to "0" to disable auto purge feature
# 和上一个参数配合使用,设置自动清理的 频率,单位为小时,默认为0表示不清理,建议设为6或12之类的值。
autopurge.purgeInterval=6

# 3.5.0及以后才有,默认为true,是为了向后兼容 的目的。如果设为false,单个server可以以集群模式启动,一个集群 可以通过重新配置降回单个node,也可以从单个node升回一个集群。  
standaloneEnabled=true
## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true


# 集群配置

# 群首选举算法,默认为3代表FastLeaderElection,前面章节已经说过,除了这种算法外其余3种都已经被标注为deprecated了, 不要修改。
electionAlg=3

# follower首次与leader连接并同步的超时值,因为集群启动或新leader选举完成后,follower需要从leader拉最新的数据,如果需要同步的数据很大还是可能超时的,单位为tickTime的倍数,没有默认值
# initLimit=

# 默认为yes,此时leader也会接受客户端的连接提供读 写服务。如果设置为no,则leader不会和客户端连接,只负责处理 follower转发的写操作请求以及集群协调事务。改写该参数需要小心, 有可能会# 从leader很忙变成follower很忙,如果集群台数不多,leader 的CPU和IO也不高的话,就使用默认值。 
leaderServes=yes

# 这里的x是一个数字,与myid文件中的id是一致的。接着可以配置两个端口,第一个端口 用于leader和follower之间的数据同步和其它通信,第二个端口用于leader选举过程中投票通信。另外可以在最后# 加上:observer标记,用于表示该服务器以observer模式运行。
# server.x=[hostname]:nnnnn[:nnnnn][:observer]
server.1=zook1.ligel.com:2888:3888
server.2=zook2.ligel.com:2888:3888
server.3=zook3.ligel.com:2888:3888

# follower后续与leader同步的超时值,单位为tickTime的倍数。该参数与initLimit不同之处是,与数据量大小关系不大,只取决于网络质量,因此还起到心跳作用,如果follower超时了,leader就认为## 该follower落后太多需要放弃。如果网络为高延迟环境,可适当增加该值,但不宜加得太大,否则会掩盖某些故障。
# syncLimit

# 仲裁时除了默认的按每机1票计算,还有加权的方法和分组的方法,这个参数就是用来设置分组方法的。
# group.x=nnnnn[:nnnnn]

# 和上面一样
# weight.x=nnnnn

# 用于设置群首选举时打开一个新的连接的超时值,单位为毫秒,默认值为5000,不需要修改
cnxTimeout=5000

# 3.5.0及以后才有,默认为true,是为了向后兼容 的目的。如果设为false,单个server可以以集群模式启动,一个集群 可以通过重新配置降回单个node,也可以从单个node升回一个集群。
standaloneEnabled=true

4. 同步配置文件

同步一下配置文件,到这里zookeeper的集群就搭建好了
可以现在一个服务器上配置,然后将整个zookeeper文件同步到另外两个服务器上

  • zookeeper文件夹
  • 挂载的路径
  • myid文件,每个服务器的myid值要不一样
  • /etc/hosts文件

5. 启动集群

进入到zookeeper中的bin文件夹下,输入下面代码,注意这里要三个服务器都要启动,不然会出现启动失败

1. 启动命令
./zkServer.sh start
2. 查看启动状态-这里要三台服务器都启动了,再用这个命令查看
./zkServer.sh status
3. 停止命令
./zkServer.sh stop

执行查询启动转态指令,出现下面代码表示启动成功

ZooKeeper JMX enabled by default
Using config: /home/resources/zookeeperbase/apache-zookeeper-3.7.0-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower

6. 可以使用zkWeb来查看zookeeper情况

软件下载地址

启动jar包,&表示后台启动
java -jar jar包名称 &

我之前为了方便管理,自己利用jar创建了docker镜像,用docker部署,但是发现docker启动的连接不上部署成功zookeeper节点,所以为了不折腾,还是老老实实本机启动吧。

下面是软件截图
在这里插入图片描述

kafka集群搭建

1. 下载kafka安装包

我是在官网下载的,这个看自己了。

2. 配置文件

修改server.properties配置文件中的东西

主要注意下面几个点,具体看下面配置文件内容

  • listeners= 这个是本机的域名/ip

  • log.dirs= 配置日志保存目录

  • zookeeper.connect= 配置zookeeper地址,多个用逗号隔开

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker
# 每一个broker在集群中的唯一表示,要求是正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况
broker.id=1

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://kafka1.ligel.com:9092

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
# 处理网络请求的最大线程数
num.network.threads=30

# The number of threads that the server uses for processing requests, which may include disk I/O
# 处理磁盘I/O的线程数
num.io.threads=30

# The send buffer (SO_SNDBUF) used by the socket server
# 套接字服务器使用的发送缓冲区(SOYSNDBUF)
socket.send.buffer.bytes=5242880

# The receive buffer (SO_RCVBUF) used by the socket server
# 套接字服务器使用的接收缓冲区(SOYRCVBUF)
socket.receive.buffer.bytes=5242880

# The maximum size of a request that the socket server will accept (protection against OOM)
# 套接字服务器将接受的请求的最大大小(对OOM的保护)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma separated list of directories under which to store log files
# 日志存放目录,多个目录使用逗号分割,如果你有多块磁盘,建议配置成多个目录,从而达到I/O的效率的提升
log.dirs=/mydata/kafcluster/kafka/log1

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
# 每个topic的分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
# 在启动时恢复日志和关闭时刷盘日志时每个数据目录的线程的数量,默认1
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
# 设置副本数量为3,这样当一台消费者宕机时,其他消费者也可以进行消费
offsets.topic.replication.factor=3
# 事务主题的复制因子(设置为更高以确保可用性)。在群集大小满足此复制因子要求之前,内部主题创建将失败。默认为3
transaction.state.log.replication.factor=3
# 重写了事务主题的min.insync.replicas配置,默认为2
transaction.state.log.min.isr=2

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
# 在强制fsync一个partition的log文件之前暂存的消息数量。调低这个值会更频繁的sync数据到磁盘,影响性能。通常建议人家使用replication来确保持久性,而不是依靠单机上的fsync,但是这可以带## 来更多的可靠性,默认10000。
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
# 2次fsync调用之间最大的时间间隔,单位为ms。即使log.flush.interval.messages没有达到,只要这个时间到了也需要调用fsync。默认3000ms.
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
# 日志保存时间 (hours|minutes),默认为7天(168小时)。超过这个时间会根据policy处理数据。bytes和minutes无论哪个先达到都会触发
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
# 日志数据存储的最大字节数。超过这个时间会根据policy处理数据。
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
# 控制日志segment文件的大小,超出该大小则追加到一个新的日志segment文件中(-1表示没有限制)
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
# 日志片段文件的检查周期,查看它们是否达到了删除策略的设置(log.retention.hours或log.retention.bytes)
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
# zookeeper集群的地址,可以是多个,多个之间用逗号分割.
zookeeper.connect=zook1.ligel.com:2181,zook2.ligel.com:2181,zook3.ligel.com:2181

# Timeout in ms for connecting to zookeeper
# ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大
zookeeper.connection.timeout.ms=18000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

记住配置好自己挂载的日志路径,每个服务器都要创建该路径。

然后就是同步整个kafka文件夹到另外两个服务器上,还是用scp -r 命令来弄

scp -r kafak文件夹路径 服务器ip:/要放到的路径

启动集群

每个服务器都要启动,进入到kafak文件夹下的bin目录
这里提供了一些脚本,有启动和停止

注意: 下面的命令一定要在kafa文件夹下的bin目录执行,不然命令执行不了。有很多相对路径

1. 启动命令
sh kafka-server-start.sh -daemon ../conf/server.properties

2. 停止
sh kafka-server-stop.sh

至于启动是否成功,创建一个主题试试,这个不多讲了
看这篇文章进行测试kafka集群是否成功

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-17 11:59:21  更:2021-07-17 12:00:52 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/21 3:29:26-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码