## 安装
官网下载最新的 包 https://www.apache.org/dyn/closer.cgi?path=/kafka/2.8.0/kafka_2.13-2.8.0.tgz
```shell # 解压 tar -xzf kafka_2.13-2.8.0.tgz # 进入安装目录 cd kafka_2.13-2.8.0 # 修改 kafka 的配置文件 vim config/service.properties
host.name=192.168.126.66
# Start the ZooKeeper service # 注,得提前安装好 java 环境,不然这一步会报错 zookeeper 依赖 java bin/zookeeper-server-start.sh config/zookeeper.properties # Start the Kafka broker service # 另外一个窗口哈 bin/kafka-server-start.sh config/server.properties
# 再开一个窗口? # 创建一个 topic quickstart-events # 注:这里用的是 localhost 如果不配置 hostname 是无法外网访问的 bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092? # 查看 topic 信息 bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092 # 生产者 在 quickstart-events 这个 topic 下创建事件 交互式的哈 bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092 # 消费者 读取这个 topic 下的事件 bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092 ```
## java连接的案例
创建一个 maven 工程?
注意这里的 kafka-clients 的版本 和我们的 kafka_2.13-2.8.0 ?版本一致?
```xml <!--kafka--> <dependency> ? ? <groupId>org.apache.kafka</groupId> ? ? <artifactId>kafka-clients</artifactId> ? ? <version>2.8.0</version> </dependency> ```
测试代码
生产者
```java package com.example.kafka.utils;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties; import java.util.Random;
public class Producer {
? ? public static String topic = "duanjt_test";//定义主题 ? ? public static void main(String[] args) throws InterruptedException { ? ? ? ? Properties p = new Properties(); ? ? ? ? p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.66:9092");//kafka地址,多个地址用逗号分割 ? ? ? ? p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); ? ? ? ? p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); ? ? ? ? KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p); ? ? ? ? try { ? ? ? ? ? ? while (true) { ? ? ? ? ? ? ? ? String msg = "Hello," + new Random().nextInt(100); ? ? ? ? ? ? ? ? ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg); ? ? ? ? ? ? ? ? kafkaProducer.send(record); ? ? ? ? ? ? ? ? System.out.println("消息发送成功:" + msg); ? ? ? ? ? ? ? ? Thread.sleep(500); ? ? ? ? ? ? } ? ? ? ? } finally { ? ? ? ? ? ? kafkaProducer.close(); ? ? ? ? }
? ? } } ```
消费者
```java package com.example.kafka.utils;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections; import java.util.Properties;
public class Consumer { ? ? public static void main(String[] args) { ? ? ? ? Properties p = new Properties(); ? ? ? ? p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.66:9092"); ? ? ? ? p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); ? ? ? ? p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); ? ? ? ? p.put(ConsumerConfig.GROUP_ID_CONFIG, "duanjt_test"); ? ? ? ? KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(p); ? ? ? ? kafkaConsumer.subscribe(Collections.singletonList(Producer.topic));// 订阅消息 ? ? ? ? while (true) { ? ? ? ? ? ? ConsumerRecords<String, String> records = kafkaConsumer.poll(100); ? ? ? ? ? ? for (ConsumerRecord<String, String> record : records) { ? ? ? ? ? ? ? ? System.out.println(String.format("topic:%s,offset:%d,消息:%s", // ? ? ? ? ? ? ? ? ? ? ? ? record.topic(), record.offset(), record.value())); ? ? ? ? ? ? } ? ? ? ? } ? ? } } ```
|