Hello Tomasz, Thanks for the question!
Streams should always call init() before passing any records to transform(...). When we talk about "reprocessing", we just mean that some record was processed, but then there was a failure before its offset was committed, and therefore we have to process it again after recovery. There's no difference in code paths between the first and second time the record is processed. In all cases, Streams will initialize the Transformer (and other Processors) before using them. I think the last part of the answer is that, if we wind up observing the same record twice in transform(...), it must be because there was a failure and then a recovery, in which case, we will close and re-create the processor (and call init() ). One thing to be aware of, though, is that (under at-least-once), Streams does not guarantee to clear out any dirty state from a state store. Therefore, even though we re-initialize the processors (including Transformers), if you're using a persistent state store, we'll just re-open it. One of the key things that exactly-once mode does differently is to clear out all dirty writes from the state store during failure recovery. I hope this helps! Thanks, John On Fri, Oct 7, 2022, at 15:57, xardaso wrote: > Hi Everyone, > > I have a question related to messages auto reprocessing in kafka > streams and Transformer/Processor init()/close() methods. > > Is possible that in some scenarios (failures, rebalance etc.) a message > is processed twice by Transformer.transform() without calling > Transformer.init() between the first and the second processing? > > To illustrate my question with an example. Let's say I have kafka > streams application with a default at_least_once semantics setting. The > topology is the following - read messages from one topic, apply > Transformer and produce messages to another topic. Transformer: > > new TransformerSupplier() { > Transformer get() { > return new Transformer() { > private ProcessorContext context; > > void init(ProcessorContext context) { > logger.info("init called()); > > this.context = context; > } > > KeyValue transform(K key, V value) { > logger.info("Transform offset: {}, partition: {}, > context.offset(), context.partition()); > return new KeyValue(key, value); > } > > void close() { > logger.info("close() called.") > } > } > } > } > > Is it possible that the app will log "Transform offset: x, partition: > y" twice with the same x and y values each time without logging "init() > called." between (chronologically)? > > I tried to simulate some failure scenarios (for example when > CommitFailedException is thrown) but in my simple test cases init() was > always called between the first and the second reprocessing. Although I > could have easily missed something or a scenario where this is not true > which is why I'm asking this question here. > > Sorry, if my question is not clear - please let me know in this case > and I'll try to clarify. > > Thank you for any help you can provide. > > Regards, > Tomasz