Hi, the behavior you describe is by design. You should increase the retention time of the re-partitioning topics manually to process old data.
-Matthias On 7/25/17 7:17 AM, Gerd Behrmann wrote: > Hi, > > While adding a new Streams based micro service to an existing Kafka > infrastructure, I have run into some issues processing older data in existing > topics. I am uncertain of the exact cause of the problems, but am looking for > advice to clarify how things are supposed to work to eliminate possibilities. > The following is my hypothesis of what may be happening; maybe somebody can > tell me why what I describe is impossible - or whether this might be a bug. > The following relates to Kafka 0.10.2.1 (confluent distribution). > > > > TL;DR: When (re)processing old data with Kafka Streams in a topology that > causes the stream to be repartitioned, the records in the -repartition topic > carry the timestamp of the original input records as extracted by the > timestamp extractor. The default retention policy of the -repartition topic > is however 7 days, allowing Kafka to delete data from the -repartition topic > even before the Streams application has a chance of reading it back in. > > > > > > The situation is basically that I have a topic with existing records dating > back several months. Each record contains a timestamp and a client identifier > (among other things). The task is quite simple: Produce an output topic that > contains the largest timestamp for each client. Distilled this looks > something like this: > > class Record { > ... > long client; > long time; > ... > } > > builder.stream(EARLIEST, Bytes(), Record(), “input") > .map((hash, record) -> new KeyValue<>(record.client, record.time)) > .groupByKey(Long(), Long()) > .reduce(Long::max, “store") > .to(Long(), Long(), “output”); > > where Bytes(), Long(), Record() return the appropriate Serde. This takes each > input record and throws the original key away, repartitions on the embedded > id, runs a reduction operation keeping the largest timestamp, and stores the > result back into a topic. The repartitioning causes an internal topic to be > created. This topic will have a cleanup.policy=delete and the server default > retention policy of 7 days. > > > I am using the default FailOnInvalidTimestamp timestamp extractor. As far as > I can determine this causes the record in the -repartition topic to have the > same metadata timestamps as the input records. Also, as far as I can see in > the Kafka server side code, log segments will be deleted once the largest > timestamp (as extracted from the records stored in the segment) is older than > the retention policy. > > > This is where I wonder how this is supposed to work when ingesting months old > data: it would appear that Kafka could start to delete segments of the > -repartition aggressively as the timestamps are several months old. This > could happen even before Kafka Streams had a chance to read the data back in > for the reduce operation. The Kafka server log would seem to support that > this happens as I see several segments be created *and deleted* right after > the application was started: > > [2017-07-24 09:37:54,735] INFO Rolled new log segment for > ‘xxx-repartition-2' in 1 ms. (kafka.log.Log) > [2017-07-24 09:37:54,735] INFO Scheduling log segment 0 for log > xxx-repartition-2 for deletion. (kafka.log.Log) > > > This repeats quite a number of times for the first half our or so - > presumably until the computation has caught up with newer data that didn’t > get deleted right away. Also the client side log seemed to indicate that > something like this was happening: > > > 2017-07-24 09:45:02,550 INFO StreamThread stream-thread [StreamThread-1] > no custom setting defined for topic xxx-repartition using original config > earliest for offset reset > > > This message too repeated quite a number of times for the first half our or > so. Looking at the Kafka Streams code, this message would get logged as a > result of the consumer failing due to an invalid offset. > > > Assuming my theory is correct, I could probably solve this problem by using a > WallclockTimestampExtractor (something I will test tomorrow). However one of > the use cases often repeated in the Kafka material is that one can reprocess > old data - surely it must be possible to reprocess using a timestamp > extractor that reflects the original time, not the current processing time? > > > I tried to Google for information about retention time and internal topics. > The closest thing I could come is that Kafka 0.11 has gained supported for > applications asking for records before a particular offset to be deleted; a > future Kafka Streams could use this to eliminate the need for having a > retention time on the internal topic and thus resolve the problem. > > > > Cheers, > > Gerd >
signature.asc
Description: OpenPGP digital signature