IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 2021-07-26 -> 正文阅读

[大数据]2021-07-26

kakfa安装及使用

1.下载安装包

在 https://kafka.apache.org/downloads下载最新的包,安装解压
在这里插入图片描述

解压后,我一般会在~/.bashrc配置文件中加入以下两行代码:

export KAFKA_HOME=/home/zyb/kafka/kafka_2.13-2.8.0
export PATH=$KAFKA_HOME/bin:$PATH

进入kafka_2.13-2.8.0解压目录,修改kafka-server 的配置文件

vim config/server.properties

修改配置文件中21、31、36和60行
在这里插入图片描述

2.功能验证

1、启动Zookeeper

? Zookeeper部署的是单点的。(以守护进程进行)

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
2、启动Kafka服务

使用 kafka-server-start.sh 启动 kafka 服务

# 同样,加-daemon选项可实现后台运行守护进程
bin/kafka-server-start.sh config/server.properties
#挂后台
nohup bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &

关闭kafka服务

bin/kafka-server-stop.sh config/server.properties
3、创建topic

首先创建一个名为test的topic,只使用单个分区和一个复本

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
# 或者
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

查看topic列表

bin/kafka-topics.sh --list --zookeeper localhost:2181
# 或者
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
4、产生消息,创建消息生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
5、消费消息,创建消息消费者
 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

3.生产者消费者模型

基本概念:

“生产者消费者模型”: 某个模块(函数等)负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、协程、线程、进程等)。产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。
单单抽象出生产者和消费者,还够不上是生产者/消费者模型。该模式还需要有一个缓冲区处于生产者和消费者之间,作为一个中介。生产者把数据放入缓冲区,而消费者从缓冲区取出数据。

生产者: 发送数据端

消费者: 接收数据端

缓冲区:    
    1. 解耦 ( 降低生产者 和 消费者之间 耦合度 )

    2. 并发 (生产者消费者数量不对等时,能保持正常通信)

    3. 缓存 (生产者和消费者 数据处理速度不一致时, 暂存数据)

4.使用Go使用kafka实现生产者消费者模型

同步生产者

package test01import (   "fmt"   "github.com/Shopify/sarama"   "log"   "os"   "time")var Address = []string{"192.168.137.121:9092"}	//虚拟机上的Kafka服务ip和端口func SyncProducer(address []string) {   config := sarama.NewConfig()   config.Producer.Return.Successes = true   config.Producer.Timeout = 5 * time.Second   //同步消费者   p, err := sarama.NewSyncProducer(address, config)   if err != nil {      log.Printf("sarama.NewSyncProducer err, message=%s \n", err)      return   }   defer p.Close()   topic := "test"   srcValue := "sync, this is a message, index=%d"   for i := 0; i < 10; i++ {      value := fmt.Sprintf(srcValue, i)      msg := &sarama.ProducerMessage{         Topic:     topic,         Value:     sarama.ByteEncoder(value),      }      part, offset, err := p.SendMessage(msg)      if err != nil {         log.Printf("send messages(%s) err=%s \n", value, err)      } else {         _, _ = fmt.Fprintf(os.Stdout, value+" 发送成功, partition=%d, offset=%d\n", part, offset)      }      time.Sleep(2 * time.Second)   }}

异步生产者

package test01import (   "fmt"   "github.com/Shopify/sarama"   "log"   "os"   "os/signal"   "time")func Test01()  {   config := sarama.NewConfig()   // 如果打开了Return.Successes配置,而又没有producer.Successes()提取,那么Successes()这个chan消息会被写满。   config.Producer.Return.Successes = true	   producer, err := sarama.NewAsyncProducer([]string{"192.168.137.121:9092"}, config)   if err != nil {      panic(err.Error())   }   defer func() {      if err := producer.Close(); err != nil {         log.Fatal(err)      }   }()   //Trap SIGINT to trigger a shutdown   signals := make(chan os.Signal, 1)   signal.Notify(signals, os.Interrupt)   index := 0   var srcValue string   var enqueued, produceErrors int   ProducerLoop:      for {         select {         case producer.Input() <- &sarama.ProducerMessage{            Topic: "test",            Key: nil,            Value: sarama.ByteEncoder(srcValue),         }:            enqueued++            index++            srcValue = fmt.Sprintf("test %d", index)         case err := <- producer.Errors():            log.Printf("Failed to produce messages: %s, err: %v\n", srcValue, err)            produceErrors++         case <- signals:            break ProducerLoop         }         time.Sleep(2 * time.Second)      }   log.Printf("Enqueued: %d; errors: %d\n", enqueued, produceErrors)}

消费者

package mainimport (	"fmt"	"github.com/Shopify/sarama")var Address = []string{"192.168.137.121:9092"}func main()  {	//配置	config := sarama.NewConfig()	//接收失败通知	config.Consumer.Return.Errors = true	//设置kafka版本号	config.Version = sarama.V2_8_0_0	//新建一个消费者	consumer, err := sarama.NewConsumer(Address, config)	if err != nil {		panic("create comsumer failed")	}	defer consumer.Close()	//特定分区消费者,需要设置主题,分区和偏移量,sarama.OffsetNewest表示每次从最新的消息开始消费	partitionConsumer, err := consumer.ConsumePartition("test", 0, sarama.OffsetNewest)	if err != nil {		fmt.Println("error get partition sonsumer")	}	defer partitionConsumer.Close()	for {		select {		case msg := <- partitionConsumer.Messages():			fmt.Println("msg offset: ", msg.Offset, " partition: ", msg.Partition, " timestrap: ",				msg.Timestamp.Format("2006-01-02 15:04"), " value: ", string(msg.Value))		case err := <- partitionConsumer.Errors():			fmt.Println(err.Err)		}	}}

使用过程中遇到的问题

1.虚拟机的网络不通,虚拟机可以ping通主机,主机ping不通虚拟机

原因在于主机的VMware Network Adapter VMnet8网络适配器IP设置不对,与虚拟机IP没在同一个网段。所以需要去本机的网络与共享中心,点击更改适配器设置,点击VMware Network Adapter VMnet8,点击属性,点击Internet协议版本4(TCP/IPv4)修改属性,将ip改到和虚拟机的同一个网段即可,可能需要先禁用后重启生效。

2.启动kafka失败,报如下错误:

在这里插入图片描述

解决方法:需修改日志文件加夹下的meta.properties

其中内容如下:在这里插入图片描述

将cluster.id修改为SWmOEPMLTn-1NE0xQ53umg即可,如下图所示
在这里插入图片描述

3.运行生产者程序时报错kafka: client has run out of available brokers to talk to (Is your cluster reachable?)

解决方法:如果zookeeper和kafka服务正常启动,并且config/server.properties配置如下图所示,都没有问题的话,那就是可能是虚拟机的防火墙没有关闭,使得端口没有暴露出来,这时可以运行systemctl disable firewalld.service关闭防火墙,重新运行程序就没问题了。
在这里插入图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-28 00:16:32  更:2021-07-28 00:16:55 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/21 1:25:22-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码