1.代码示例
public class StreamSample {
private static final String TOPIC_INPUT = "steven-stream-in";
private static final String TOPIC_OUT = "steven-stream-out";
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordCount");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final StreamsBuilder builder = new StreamsBuilder();
foreachStream(builder);
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
static void foreachStream(final StreamsBuilder builder) {
KStream<String, String> source = builder.stream(TOPIC_INPUT);
source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
.foreach((key, value) -> System.out.println(key + ":" + value));
}
}
2.代码运行结果 (1).创建生产者 打开一个cmd终端,在E:\Kafka\kafka_2.12-1.1.0\bin\windows目录下执行kafka-console-producer.bat --broker-list localhost:9092 --topic steven-stream-in命令,创建生产者。
kafka-console-producer.bat --broker-list localhost:9092 --topic steven-stream-in

(2).创建消费者 打开一个cmd终端,在E:\Kafka\kafka_2.12-1.1.0\bin\windows目录下执行kafka-console-consumer.bat --zookeeper localhost:2181 --topic steven-stream-out命令,创建消费者。
kafka-console-consumer.bat --zookeeper localhost:2181 --topic steven-stream-out
--property print.key=true --property print.value=true
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

(3).生产者命令框中输入数据
I am Chinese
I am American
I am from China

(4).控制台查看分析结果
null:I
null:am
null:Chinese
null:I
null:am
null:American
null:I
null:am
null:from
null:China
|