[ 
https://issues.apache.org/jira/browse/BEAM-591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15572540#comment-15572540
 ] 

Raghu Angadi commented on BEAM-591:
-----------------------------------


PubSubIO does advance the watermark to current time if there haven't been any 
records recently ([line 
996|https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java#L996]).
 

PubSub tracks last one minute of timestamps since pubsub might deliver out of 
order. Kafka does not have that issue. In addition KafkaIO knows if it has 
caught up with the latest records. 

For the default watermark case (i.e. KafkaIO processing time), I propose 
KafkaIO should advance to current time when the backlog is zero (back log is 
updated every 5 seconds). This will cover most of the use cases.

This policy would be fine for custom timestamps too (as in PubSubIO). If the 
users want more control, we could invoke watermark function with out the the 
kafka record so that user can return current timestamp. 

> Better handling watermark in KafkaIO
> ------------------------------------
>
>                 Key: BEAM-591
>                 URL: https://issues.apache.org/jira/browse/BEAM-591
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-extensions
>            Reporter: Raghu Angadi
>            Assignee: Raghu Angadi
>
> Right now default watermark in KafkaIO is same as timestamp of the record. 
> The main problem with this is that watermark does not change if there n't any 
> new records on the topic. This can hold up many open windows. 
> The record timestamp by default is set to processing time (i.e. when the 
> runner reads a record from Kafka reader).
> A user can provide functions to calculate watermark and record timestamps. 
> There are a few concerns with current design:
> * What should happen when a kafka topic is idle:
>   ** in default case, I think watermark should advance to current time.
>   ** What should happen when user has provided a function to calculate record 
> timestamp? 
>    *** Should the watermark stay same as record timestamp?
>    *** same when user has provided own watermark function? 
> * Are the current semantics of user provided watermark function correct?
>   ** -it is run once for each record read-.
>   ** -Should it instead be run inside {{getWatermark()}} called by the runner 
> (we could still provide the last user record, and its timestamp)-.
>   ** It does run inside {{getWatermark()}}. should we pass current record 
> timestamp in addition to the record?
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to