[ 
https://issues.apache.org/jira/browse/KAFKA-6092?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-6092:
------------------------------------

> Time passed in punctuate call is currentTime, not punctuate schedule time. 
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-6092
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6092
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.11.0.0
>            Reporter: Stephane Maarek
>            Priority: Major
>
> The java doc specifies that for a Transformer, calling context.schedule calls 
> punctuate every 1000ms. This is not entirely accurate, as if no data is 
> received for a while, punctuate won't be called.
> {code}
>      *             void init(ProcessorContext context) {
>      *                 this.context = context;
>      *                 this.state = context.getStateStore("myTransformState");
>      *                 context.schedule(1000); // call #punctuate() each 
> 1000ms
>      *             }
> {code}
> When you receive new data say after 20 seconds, punctuate will play catch up 
> and will be called 20 times at reception of the new data. 
> the signature of punctuate is
> {code}
> *             KeyValue punctuate(long timestamp) {
>      *                 // can access this.state
>      *                 // can emit as many new KeyValue pairs as required via 
> this.context#forward()
>      *                 return null; // don't return result -- can also be 
> "new KeyValue()"
>      *             }
> {code}
> but the timestamp being passed is currentTimestamp at the time of the call to 
> punctuate, not at the time the punctuate was scheduled. It is very confusing 
> and I think the timestamp should represent the one at which the punctuate 
> should have been scheduled. Getting the current timestamp is not adding much 
> information as it can easily obtained using  System.currentTimeMillis();



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to