Apache Kafka and Kafka Streams
Apache Kafka is a distributed event streaming platform capable of handling huge volume of data. Kafka Streams is a client library for building applications and microservices for processing data stored in kafka. Its offers APIs to perform variety of operations such as aggregations and joins on the event stream.
Data Storage in kafka
Kafka relies heavily on the filesystem for storing and caching messages. Kafka stores data in ordered, append only sequence of messages called partitions. partitions are again split to smaller files called segments. Retention policy configurations determines how much or how long data should be retained in Kafka. On disk, a partition is a directory and each segment is an index file and a log file.
Too many files
Kafka opens many files at the same time. Quite often the number of files kept open by kafka process exceeds the default setting of 1024 for the maximum number of open files on most Unix-like systems. This causes kafka process and inturn stream processes to get stalled.
Tracking openfiles
We run a few kafka-stream processes in a server. We experienced
java.io.IOException: Too many open files
exception even after reducing the
retention period of kafka topics to 30 minutes and increasing ulimit
to 4096.
The first step was to install a cronjob to capture the files opened by kafka.
*/5 * * * * ls -l /proc/<pid of kafka>/fd > <outdir>/$(date +\%s)
This job captures the files opened by kafka every and writes to file. The output file is named after system’s timestamp which indicates the moment it was captured at.
Here is a part of captured file.
lrwx------ 1 kafka kafka 64 Mar 15 01:43 132 -> anon_inode:[eventpoll]
lrwx------ 1 kafka kafka 64 Mar 15 01:43 133 -> socket:[21350]
lrwx------ 1 kafka kafka 64 Mar 15 01:43 134 -> /tmp/kafka-logs/topic1-0/00000000000000083570.log
lrwx------ 1 kafka kafka 64 Mar 15 01:43 135 -> /tmp/kafka-logs/topic2-0/00000000000001006936.log
lrwx------ 1 kafka kafka 64 Mar 15 01:43 136 -> /tmp/kafka-logs/topic3-0/00000000000000399536.log
lrwx------ 1 kafka kafka 64 Mar 15 01:43 137 -> /tmp/kafka-logs/topic2-0/00000000000000794994.log
lr-x------ 1 kafka kafka 64 Mar 15 01:43 138 -> pipe:[21355]
l-wx------ 1 kafka kafka 64 Mar 15 01:43 139 -> pipe:[21355]
lr-x------ 1 kafka kafka 64 Mar 15 01:43 14 -> /usr/lib/jvm/java-8-openjdk-amd64/jre/lib/ext/cldrdata.jar
lrwx------ 1 kafka kafka 64 Mar 15 01:43 140 -> anon_inode:[eventpoll]
Summarizing and Visualizing the trend
Next step was to summarize each file to track the number of open files per topic. A couple of lines of bash did it.
for topic in "topic-1" "topic-2" "topic-3"; do
count=$(sed 's/.*\-> //g' ${infile} | grep \\.log | grep -c ${topic})
printf "%-5s ${topic}\n" ${count} >> ${infile}.summary
done
This gave the following result.
249 topic1
180 topic2
50 topic3
Checking random summary files captured for a whole day didn’t give much insight. We scribbled a python program to collect the number of open files for each topic from the summary files and plot it against the timestamp.
The result was interesting. Despite of configured retention period, the number of files opened for an internal topic was ever increasing.
Back to Kafka
Kafka creates internal changelog topics to store data related to aggegation operations. It was one such topic that opens too many files.
Kafka provides a bash script namely kafka-configs.sh
to handle configurations.
Checking configurations of that changelog topic revealed that the retention
period for that topic is different from the configured value.
$: bin/kafka-configs.sh --entity-type topics --entity-name \
result-store-changelog --zookeeper localhost:2181 --describe
Configs for topic 'result-store-changelog' are retention.ms=172800000,cleanup.policy=compact,delete
Inspecting the kafka logs also gave the same result.
INFO Created log for partition result-store-changelog-0 in /tmp/kafka-logs with
properties {compression.type -> producer, message.format.version -> 1.1-IV0,
retention.bytes -> -1, delete.retention.ms -> 86400000,
segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 172800000,
message.timestamp.difference.max.ms -> 9223372036854775807,
flush.messages -> 9223372036854775807}. (kafka.log.LogManager)
Kafka documentation explains this behavior here
Solutions
As mentioned in the above kafka documentation, configurations for internals
topics can be overridden by passing configs to KafkaStreams
constructor.
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "myapp");
config.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_MS_CONFIG), 100);
KafkaStreams streams = new KafkaStreams(topology, config);
Or setting retention and grace periods of TimeWindows
when creating
aggregation windows.
stream.windowedBy(
TimeWindows.of(Duration.ofSeconds(windowSizeSec))
.grace(Duration.ofSeconds(windowGraceSec))
).aggregate(...)