Unit testing your Kafka code is incredibly important. It’s transporting your most important data. This is especially true for your Consumer
s. They are the end point for using the data. There are often many different Consumer
s using the data. You’ll want to unit test all of them.
In a previous post, I showed you how to unit test Producers
.
Refactoring Your Consumers
First of all, you’ll need to be able to change your Consumer
at runtime. Instead of using the KafkaConsumer
object directly, you’ll use the Consumer
interface.
public Consumer<String, String> consumer;
You can use whichever method for dependency injection, but I’m making the Consumer
public so I can change it from the unit test.
Next, you’ll want to refactor the code for creating your KafkaConsumer
. The creation of the KafkaConsumer
should be in separate method that won’t get called by your production Consumer
code.
You’ll also need to refactor the code that consumes the data from the Consumer
object. This code will need to be callable from the unit test. Also, the Consumer
object often consumes in an infinite loop (while (true)
). You need to refactor the actual consumption code so it doesn’t get stuck in an infinite loop.
Unit Testing Your Consumer
Kafka unit tests of the Consumer
code use MockConsumer
object. The @Before
will initialize the MockConsumer
before each test.
MockConsumer<String, String> consumer;
@Before
public void setUp() {
consumer = new MockConsumer<String, String>(OffsetResetStrategy.EARLIEST);
}
Have you been searching for the best data engineering training? You’ve found it. Sign up for my list so you can get my Professional Data Engineering course.
Once we’ve set the objects up, we can start testing.
@Test
public void testConsumer() throws IOException {
MyTestConsumer myTestConsumer = new MyTestConsumer();
myTestConsumer.consumer = consumer;
consumer.assign(Arrays.asList(new TopicPartition("my_topic", 0)));
HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
beginningOffsets.put(new TopicPartition("my_topic", 0), 0L);
consumer.updateBeginningOffsets(beginningOffsets);
consumer.addRecord(new ConsumerRecord<String, String>("my_topic",
0, 0L, "mykey", "myvalue0"));
consumer.addRecord(new ConsumerRecord<String, String>("my_topic", 0,
1L, "mykey", "myvalue1"));
consumer.addRecord(new ConsumerRecord<String, String>("my_topic", 0,
2L, "mykey", "myvalue2"));
consumer.addRecord(new ConsumerRecord<String, String>("my_topic", 0,
3L, "mykey", "myvalue3"));
consumer.addRecord(new ConsumerRecord<String, String>("my_topic", 0,
4L, "mykey", "myvalue4"));
myTestConsumer.consume();
// This just tests for exceptions
// Somehow test what happens with the consume()
}
We start off by instantiating the Consumer
we’re wanting to test. We inject our MockConsumer
into the Consumer
. Then, the MockConsumer
’s topic, partitions, and beginning offsets need to be set up. We send some data with the Consumer
. All of the data added by the MockConsumer
will be consumed by the Consumer
. We call the addRecord()
method for every ConsumerRecord
we want the Consumer
to see. Finally, we consume the data.
A quick note that this test only validates that the Consumer
doesn’t throw an exception while processing this data. To verify the actual processing or output, you may need to mock another object or gather the output in a last and run your assertions.