Hello Bruno, I've pasted my suggestions on the SO thread: https://stackoverflow.com/questions/53555783/kafka-streams-dead-letter-queue-quarantined-topic/53599311#53599311
The general idea to have a one-off streams app that can shutdown automatically after it has likely exhausted all the records of a topic is to use a shutdown latch; admittedly this is not the cleanest way to do it and there has been some discussions around this issue ( https://cwiki.apache.org/confluence/display/KAFKA/KIP-95%3A+Incremental+Batch+Processing+for+Kafka+Streams), but it has not yet supported in Streams. Guozhang On Fri, Nov 30, 2018 at 4:23 AM Bruno Bieth <[email protected]> wrote: > We are building a kafka-streams application as part of a large > microservices architecture. We want to be resilient to backward > incompatible format changes and have introduced a quarantined topic. We > couldn't find anything provided by the library so we sort of rolled our > own, by simply "manually" trying to deserialize a record and forward it to > the quarantined topic upon failure. > > Easy peasy. > > Now comes the replay of the quarantined events. This should be triggered > externally (say a REST call) and move the events to the next topic if > deserialization succeeds. Can we leverage kafka-streams to perform such a > on-demand operation? Intuitively it should be as simple as > builder.stream(quarantined).to(nextTopic). > > Looking at the processor API it doesn't seem that it is possible to halt > processing. Bluntly blocking isn't an option as that would affect the other > tasks running in the same StreamThread and having another KafkaStream app > seems overkill. I would like to avoid hand coding a consumer -> producer > loop, so I'm also considering akka-stream kafka but that sounds a bit > overkill too... > > Thanks > > This question was also asked on SO: > > https://stackoverflow.com/questions/53555783/kafka-streams-dead-letter-queue-quarantined-topic > -- -- Guozhang
