Hello Tomasz,

Thanks for the question!

Streams should always call init() before passing any records to transform(...).

When we talk about "reprocessing", we just mean that some record was processed, 
but then there was a failure before its offset was committed, and therefore we 
have to process it again after recovery. There's no difference in code paths 
between the first and second time the record is processed. In all cases, 
Streams will initialize the Transformer (and other Processors) before using 
them.

I think the last part of the answer is that, if we wind up observing the same 
record twice in transform(...), it must be because there was a failure and then 
a recovery, in which case, we will close and re-create the processor (and call 
init() ).

One thing to be aware of, though, is that (under at-least-once), Streams does 
not guarantee to clear out any dirty state from a state store. Therefore, even 
though we re-initialize the processors (including Transformers), if you're 
using a persistent state store, we'll just re-open it. One of the key things 
that exactly-once mode does differently is to clear out all dirty writes from 
the state store during failure recovery.

I hope this helps!

Thanks,
John

On Fri, Oct 7, 2022, at 15:57, xardaso wrote:
> Hi Everyone,
>
> I have a question related to messages auto reprocessing in kafka 
> streams and Transformer/Processor init()/close() methods.
>
> Is possible that in some scenarios (failures, rebalance etc.) a message 
> is processed twice by Transformer.transform() without calling 
> Transformer.init() between the first and the second processing?
>
> To illustrate my question with an example. Let's say I have kafka 
> streams application with a default at_least_once semantics setting. The 
> topology is the following - read messages from one topic, apply 
> Transformer and produce messages to another topic. Transformer:
>
> new TransformerSupplier() {
>      Transformer get() {
>          return new Transformer() {
>              private ProcessorContext context;
>
>              void init(ProcessorContext context) {
>                  logger.info("init called());
>
> this.context = context;
>              }
>
>              KeyValue transform(K key, V value) {
>                  logger.info("Transform offset: {}, partition: {}, 
> context.offset(), context.partition());
>                  return new KeyValue(key, value);
>              }
>
>              void close() {
>                  logger.info("close() called.")
>              }
>          }
>      }
>  }
>
> Is it possible that the app will log "Transform offset: x, partition: 
> y" twice with the same x and y values each time without logging "init() 
> called." between (chronologically)?
>
> I tried to simulate some failure scenarios (for example when 
> CommitFailedException is thrown) but in my simple test cases init() was 
> always called between the first and the second reprocessing. Although I 
> could have easily missed something or a scenario where this is not true 
> which is why I'm asking this question here.
>
> Sorry, if my question is not clear - please let me know in this case 
> and I'll try to clarify.
>
> Thank you for any help you can provide.
>
> Regards,
> Tomasz

Reply via email to