Search
Close this search box.

Unit testing your Kafka code is incredibly important. It’s transporting your most important data. This is especially true for your Consumers. They are the end point for using the data. There are often many different Consumers using the data. You’ll want to unit test all of them.

In a previous post, I showed you how to unit test Producers.

Refactoring Your Consumers

First of all, you’ll need to be able to change your Consumer at runtime. Instead of using the KafkaConsumer object directly, you’ll use the Consumer interface.

public Consumer<String, String> consumer;

You can use whichever method for dependency injection, but I’m making the Consumer public so I can change it from the unit test.

Next, you’ll want to refactor the code for creating your KafkaConsumer. The creation of the KafkaConsumer should be in separate method that won’t get called by your production Consumer code.

You’ll also need to refactor the code that consumes the data from the Consumer object. This code will need to be callable from the unit test. Also, the Consumer object often consumes in an infinite loop (while (true)). You need to refactor the actual consumption code so it doesn’t get stuck in an infinite loop.

Unit Testing Your Consumer

Kafka unit tests of the Consumer code use MockConsumer object. The @Before will initialize the MockConsumer before each test.

MockConsumer<String, String> consumer;

@Before
public void setUp() {
    consumer = new MockConsumer<String, String>(OffsetResetStrategy.EARLIEST);
}

Have you been searching for the best data engineering training? You’ve found it. Sign up for my list so you can get my Professional Data Engineering course.

Once we’ve set the objects up, we can start testing.

@Test
public void testConsumer() throws IOException {
    MyTestConsumer myTestConsumer = new MyTestConsumer();
    myTestConsumer.consumer = consumer;

    consumer.assign(Arrays.asList(new TopicPartition("my_topic", 0)));

    HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
    beginningOffsets.put(new TopicPartition("my_topic", 0), 0L);
    consumer.updateBeginningOffsets(beginningOffsets);

    consumer.addRecord(new ConsumerRecord<String, String>("my_topic",
            0, 0L, "mykey", "myvalue0"));
    consumer.addRecord(new ConsumerRecord<String, String>("my_topic", 0,
            1L, "mykey", "myvalue1"));
    consumer.addRecord(new ConsumerRecord<String, String>("my_topic", 0,
            2L, "mykey", "myvalue2"));
    consumer.addRecord(new ConsumerRecord<String, String>("my_topic", 0,
            3L, "mykey", "myvalue3"));
    consumer.addRecord(new ConsumerRecord<String, String>("my_topic", 0,
            4L, "mykey", "myvalue4"));

    myTestConsumer.consume();

    // This just tests for exceptions
    // Somehow test what happens with the consume()
} 

We start off by instantiating the Consumer we’re wanting to test. We inject our MockConsumer into the Consumer. Then, the MockConsumer’s topic, partitions, and beginning offsets need to be set up. We send some data with the Consumer. All of the data added by the MockConsumer will be consumed by the Consumer. We call the addRecord() method for every ConsumerRecord we want the Consumer to see. Finally, we consume the data.

A quick note that this test only validates that the Consumer doesn’t throw an exception while processing this data. To verify the actual processing or output, you may need to mock another object or gather the output in a last and run your assertions.