[ 
https://issues.apache.org/jira/browse/KAFKA-7035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511063#comment-16511063
 ] 

Antony Stubbs commented on KAFKA-7035:
--------------------------------------

Call your punctuate processor from the close() method?

> Kafka Processor's init() method sometimes is not called
> -------------------------------------------------------
>
>                 Key: KAFKA-7035
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7035
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Oleksandr Konopko
>            Priority: Critical
>         Attachments: TransformProcessor.java
>
>
> Scenario:
> 1. We have processing of Kafka Topic which is implemented with Processor API
> 2. We want to collect metrics (lets say just count number of processed 
> entities for simplicity)
> 3. How we tried to organize this
>  * process data with process() method and send it down the stream with context
>  * on each call of process() method update the counter
>  * schedule puctuate function which will send metric to special topic. Metric 
> is build with counter
> You can find the code (we removed all business sensitive code out of it, so 
> it should be easy to read) in attachment
>  
> Problematic Kafka Streams behaviour that i can see by logging every step:
> 1. We have 800000 messages in the input topic
> 2. Kafka Streams creates 4 Processor instances. Lets name them: ProcessorA, 
> ProcessorB, ProcessorC and ProcessorD
> 3. ProcessorA and ProcessorB receive 1-5% of data. Data is processed 
> correctly, results are sent down the stream. Counter is upated
> 4. init() method was not called for ProcessorA and ProcessorB
> 5. ProcessorC and ProcessorD are created and they start to receive all the 
> rest of data. 95-99%
> 6. init() method is called for both ProcessorC and ProcessorD. It initiates 
> punctuation, which causes Metrics message be created and sent down the metric 
> stream periodically
> 7. ProcessorA and ProcessorB are closed. init() was never called for them. So 
> Metric entity was not sent to metrics topic
> 8. Processing is finished.
>  
> In the end:
> Expected:
>  * 800000 entities were processed and sent to the Sink
>  * Metrics entities contain counters which sum up to 800000
> Actual results:
>  * 800000 entities were processed and sent to the Sink
>  * Metrics entities contain counters which sum up to some number 3-6% less 
> than 800000, for example 786543
>  
> Problem:
>  * init() method call is not guaranteed
>  * there is no way to guarantee that all work was done by punctuate method 
> before close()
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to