Search
Close this search box.

Kafka 0.10 is out. Here are the changes that developers need to know about. Here is the new URL to the Kafka 0.10 JavaDoc.

KafkaConsumer

The KafkaConsumer had a minor change to that allows you to specify a maximum number of messages to return. You can set this by using the max.poll.records property to a number in the KafkaConsumer. This change helps your consumer from retrieving too many messages at once.

Kafka Streams

The major, and I mean major change for this version is the addition of Kafka Streams. The KIP (Kafka Improvement Process) gives some of the design and motivation behind creating Kafka Streams. Confluent’s docs also give some give information about the design and concepts behind Kafka Streams. Jay Kreps wrote a post introducing Kafka Streams.

There are two divisions to Kafka Streams: a “DSL” and a low-level API. Both share many of the same features. I put quotes around DSL, because I don’t think it really adheres to the definition of a DSL. Think of the them more as two APIs. One is higher level and the other exposes the lower level guts.

You can find some code in Confluent’s GitHub showing how to use the DSL. There are some guides in Confluent’s docs. These show you can do real-time processing with Java 8 lambdas.

You can find a full example of the low level API in my GitHub. This example takes Spotify-esque data and processes it with Kafka Streams. You can watch a YouTube video I created showing the code at work.

If you’re used to the functions that real-time processing systems like Apache Spark, Apache Flink, or Apache Beam expose, you’ll be right at home in the DSL. If you’re not, you’ll need to spend some time understand what methods like map, flatMap, or mapValues mean.

Here is a paraphrased Word Count example that uses the DSL:

// Start building a DAG
KStreamBuilder builder = new KStreamBuilder();

// Use TextLinesTopic as the input Kafka topic
KStream<String, String> textLines = builder.stream(
    stringSerde, stringSerde, "TextLinesTopic");

KStream<String, Long> wordCounts = textLines
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
    .map((key, word) -> new KeyValue<>(word, word))
    .countByKey("Counts")
    .toStream();

// Write the `KStream<String, Long>` to the output topic.
wordCounts.to(stringSerde, longSerde, "WordsWithCountsTopic");

KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();

Here is a paraphrased example of the low level API:

@Override
public void process(String key, String value) {
  // Process a single message
  String mapKey = getKeyName(value);
  String oldValue = stateStore.get(mapKey);

  if (oldValue == null) {
    // Swap k/v around as eventsim key is null
    stateStore.put(mapKey, value);
  } else {
    // TODO: Handle when k/v already there
    // this.kvStore.put(key, oldValue + newValue);
  }

  context.commit();
}

@Override
public void punctuate(long streamTime) {
  // Process a window
  currentGeneration.incrementAndGet();

  KeyValueIterator<String, String> iter = stateStore.all();

  double totalDuration = 0;

  long totalEntries = 0;
  // ...
}

These examples should give you a good idea of the differences between the API approaches. The base class for these is KeyValueStore. You can see my usage of the KeyValueStore, MeteredKeyValueStore, in the low level example.