Running a Kafka streams
Running a Kafka streams
It’s the summary of the lecture(Reference)
First, running a kafka, zookeeper, and bootstrap sequencially.
# your version of kafka
cd kafka_2.12-2.8.0
yours@yours:~/kafka_2.12-2.8.0$ bin/zookeeper-server-start.sh config/zookeeper.properties
yours@yours:~/kafka_2.12-2.8.0$ kafka-server-start.sh config/server.properties
check that topics are well generated. In word-count example, word-count-input and word-count-output are generated. And then, launch a kafka consumer like as below.
# launch a Kafka consumer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic word-count-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
And then, running the below codes(Source) . This code is for basic settings related to kafka streams pipeline process. we can get more details in Source2
public static void main(String[] args) {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); //setting r/w type
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
KStreamBuilder builder = new KStreamBuilder();
// 1 - stream from Kafka. it's a builder.
KStream<String, String> textLines = builder.stream("word-count-input");
KTable<String, Long> wordCounts = textLines
// 2 - map values to lowercase
.mapValues(textLine -> textLine.toLowerCase())
// can be alternatively written as:
// .mapValues(String::toLowerCase)
// 3 - flatmap values split by space
.flatMapValues(textLine -> Arrays.asList(textLine.split("\\W+")))
// 4 - select key to apply a key (we discard the old key)
.selectKey((key, word) -> word)
// 5 - group by key before aggregation
.groupByKey()
// 6 - count occurrences
.count("Counts");
// 7 - to in order to write the results back to kafka
wordCounts.to(Serdes.String(), Serdes.Long(), "word-count-output");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
// shutdown hook to correctly close the streams application
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
// Update:
// print the topology every 10 seconds for learning purposes
while(true){
System.out.println(streams.toString());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
break;
}
}
}
yours@yours:~/kafka_2.12-2.8.0$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic word-count-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
You can get the kafka streams data when you putting the data using a kafka-console-producer.sh as below.
in/kafka-console-producer.sh --broker-list localhost:9092 --topic word-count-input
>hello kafka streams
>kafka streams is working