IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> kafka的基本使用 -> 正文阅读

[大数据]kafka的基本使用

## 安装

官网下载最新的 包 https://www.apache.org/dyn/closer.cgi?path=/kafka/2.8.0/kafka_2.13-2.8.0.tgz

```shell
# 解压
tar -xzf kafka_2.13-2.8.0.tgz
# 进入安装目录
cd kafka_2.13-2.8.0
# 修改 kafka 的配置文件
vim config/service.properties

host.name=192.168.126.66

# Start the ZooKeeper service
# 注,得提前安装好 java 环境,不然这一步会报错 zookeeper 依赖 java
bin/zookeeper-server-start.sh config/zookeeper.properties
# Start the Kafka broker service
# 另外一个窗口哈
bin/kafka-server-start.sh config/server.properties

# 再开一个窗口?
# 创建一个 topic quickstart-events
# 注:这里用的是 localhost 如果不配置 hostname 是无法外网访问的
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092?
# 查看 topic 信息
bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
# 生产者 在 quickstart-events 这个 topic 下创建事件 交互式的哈
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
# 消费者 读取这个 topic 下的事件
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
```

## java连接的案例

创建一个 maven 工程?

注意这里的 kafka-clients 的版本 和我们的 kafka_2.13-2.8.0 ?版本一致?

```xml
<!--kafka-->
<dependency>
? ? <groupId>org.apache.kafka</groupId>
? ? <artifactId>kafka-clients</artifactId>
? ? <version>2.8.0</version>
</dependency>
```

测试代码

生产者

```java
package com.example.kafka.utils;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.Random;

public class Producer {

? ? public static String topic = "duanjt_test";//定义主题
? ? public static void main(String[] args) throws InterruptedException {
? ? ? ? Properties p = new Properties();
? ? ? ? p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.66:9092");//kafka地址,多个地址用逗号分割
? ? ? ? p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
? ? ? ? p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
? ? ? ? KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p);
? ? ? ? try {
? ? ? ? ? ? while (true) {
? ? ? ? ? ? ? ? String msg = "Hello," + new Random().nextInt(100);
? ? ? ? ? ? ? ? ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg);
? ? ? ? ? ? ? ? kafkaProducer.send(record);
? ? ? ? ? ? ? ? System.out.println("消息发送成功:" + msg);
? ? ? ? ? ? ? ? Thread.sleep(500);
? ? ? ? ? ? }
? ? ? ? } finally {
? ? ? ? ? ? kafkaProducer.close();
? ? ? ? }

? ? }
}
```

消费者

```java
package com.example.kafka.utils;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.Properties;

public class Consumer {
? ? public static void main(String[] args) {
? ? ? ? Properties p = new Properties();
? ? ? ? p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.66:9092");
? ? ? ? p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
? ? ? ? p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
? ? ? ? p.put(ConsumerConfig.GROUP_ID_CONFIG, "duanjt_test");
? ? ? ? KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(p);
? ? ? ? kafkaConsumer.subscribe(Collections.singletonList(Producer.topic));// 订阅消息
? ? ? ? while (true) {
? ? ? ? ? ? ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
? ? ? ? ? ? for (ConsumerRecord<String, String> record : records) {
? ? ? ? ? ? ? ? System.out.println(String.format("topic:%s,offset:%d,消息:%s", //
? ? ? ? ? ? ? ? ? ? ? ? record.topic(), record.offset(), record.value()));
? ? ? ? ? ? }
? ? ? ? }
? ? }
}
```

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-24 11:33:51  更:2021-07-24 11:34:46 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年12日历 -2024/12/6 20:57:35-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码