[jira] [Commented] (BEAM-704) KafkaIO should handle "latest offset" evenly, and persist it as part of the CheckpointMark.

2016-11-08 Thread Raghu Angadi (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15647814#comment-15647814
 ] 

Raghu Angadi commented on BEAM-704:
---

For further discussion and the fix see 
https://github.com/apache/incubator-beam/pull/1071.

> KafkaIO should handle "latest offset" evenly, and persist it as part of the 
> CheckpointMark.
> ---
>
> Key: BEAM-704
> URL: https://issues.apache.org/jira/browse/BEAM-704
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Reporter: Amit Sela
>Assignee: Raghu Angadi
>
> Currently, the KafkaIO (when configured to "latest") will check the latest 
> offset on the worker. This means that each worker sees a "different" latest 
> for the time it checks for the partitions assigned to it.
> This also means that if a worker fails before starting to read, and new 
> messages were added in between, they would be missed.
> I think we should consider checking the offsets (could be the same for 
> "earliest") when running initialSplits (that's how Spark does that as well, 
> one call from the driver for all topic-partitions).
> I'd also suggest we persist the latest offset as part of the CheckpointMark 
> so that once latest is set, it is remembered until new messages arrive and it 
> doesn't need to be resolved again (and if there were new messages available 
> they won't be missed upon failure).
> For Spark this is even more important as state is passed in-between 
> micro-batches and sparse partitions may skip messages until a message finally 
> arrives within the read time-frame. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-704) KafkaIO should handle "latest offset" evenly, and persist it as part of the CheckpointMark.

2016-10-09 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15559473#comment-15559473
 ] 

Amit Sela commented on BEAM-704:


I forgot about consumer-group..
I didn't mean that Beam currently asks "start-checkpoint" for UnboundedSources, 
but I suggested that it would be possible, so instead of assigning 
topic/partitions and asking for offsets on the worker, it would have all the 
information (topic-partition-offset) prior to splitting and they would be a 
property of the Source instead of the Reader (if there is a previous 
CheckpointMark the Reader would ignore them). I do understand how this might be 
problematic if the driver may run in an environment that can't connect with the 
Kafka cluster (Spark can run the driver on cluster as well).   

> KafkaIO should handle "latest offset" evenly, and persist it as part of the 
> CheckpointMark.
> ---
>
> Key: BEAM-704
> URL: https://issues.apache.org/jira/browse/BEAM-704
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Reporter: Amit Sela
>Assignee: Raghu Angadi
>
> Currently, the KafkaIO (when configured to "latest") will check the latest 
> offset on the worker. This means that each worker sees a "different" latest 
> for the time it checks for the partitions assigned to it.
> This also means that if a worker fails before starting to read, and new 
> messages were added in between, they would be missed.
> I think we should consider checking the offsets (could be the same for 
> "earliest") when running initialSplits (that's how Spark does that as well, 
> one call from the driver for all topic-partitions).
> I'd also suggest we persist the latest offset as part of the CheckpointMark 
> so that once latest is set, it is remembered until new messages arrive and it 
> doesn't need to be resolved again (and if there were new messages available 
> they won't be missed upon failure).
> For Spark this is even more important as state is passed in-between 
> micro-batches and sparse partitions may skip messages until a message finally 
> arrives within the read time-frame. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-704) KafkaIO should handle "latest offset" evenly, and persist it as part of the CheckpointMark.

2016-10-07 Thread Raghu Angadi (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15556812#comment-15556812
 ] 

Raghu Angadi commented on BEAM-704:
---

> Beam does not ask the reader for checkpoint 
I meant say 'does not ask the Source'

> KafkaIO should handle "latest offset" evenly, and persist it as part of the 
> CheckpointMark.
> ---
>
> Key: BEAM-704
> URL: https://issues.apache.org/jira/browse/BEAM-704
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Reporter: Amit Sela
>Assignee: Raghu Angadi
>
> Currently, the KafkaIO (when configured to "latest") will check the latest 
> offset on the worker. This means that each worker sees a "different" latest 
> for the time it checks for the partitions assigned to it.
> This also means that if a worker fails before starting to read, and new 
> messages were added in between, they would be missed.
> I think we should consider checking the offsets (could be the same for 
> "earliest") when running initialSplits (that's how Spark does that as well, 
> one call from the driver for all topic-partitions).
> I'd also suggest we persist the latest offset as part of the CheckpointMark 
> so that once latest is set, it is remembered until new messages arrive and it 
> doesn't need to be resolved again (and if there were new messages available 
> they won't be missed upon failure).
> For Spark this is even more important as state is passed in-between 
> micro-batches and sparse partitions may skip messages until a message finally 
> arrives within the read time-frame. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-704) KafkaIO should handle "latest offset" evenly, and persist it as part of the CheckpointMark.

2016-10-07 Thread Raghu Angadi (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15556749#comment-15556749
 ] 

Raghu Angadi commented on BEAM-704:
---

https://github.com/apache/incubator-beam/pull/1071 : sets offset on the reader.

{quote} A read from Kafka requires to specify topic/s and either specific 
partitions or "earliest/latest". {quote}

Thats is not true. It does not require either 'earliest' or 'latest'. 'latest' 
is default. You can have a consumer-group id, in which case it would defailt to 
what is committed for that consumer-id. 

{quote} If we were to handle that on splitting, all Kafka reads would have a 
"starting" CheckpointMark  {quote}

That is not correct. IFAIK, Beam does not ask the reader for checkpoint (at 
least Google Dataflow does not). getCheckpointMark() is only called on the 
reader on the worker. 


> KafkaIO should handle "latest offset" evenly, and persist it as part of the 
> CheckpointMark.
> ---
>
> Key: BEAM-704
> URL: https://issues.apache.org/jira/browse/BEAM-704
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Reporter: Amit Sela
>Assignee: Raghu Angadi
>
> Currently, the KafkaIO (when configured to "latest") will check the latest 
> offset on the worker. This means that each worker sees a "different" latest 
> for the time it checks for the partitions assigned to it.
> This also means that if a worker fails before starting to read, and new 
> messages were added in between, they would be missed.
> I think we should consider checking the offsets (could be the same for 
> "earliest") when running initialSplits (that's how Spark does that as well, 
> one call from the driver for all topic-partitions).
> I'd also suggest we persist the latest offset as part of the CheckpointMark 
> so that once latest is set, it is remembered until new messages arrive and it 
> doesn't need to be resolved again (and if there were new messages available 
> they won't be missed upon failure).
> For Spark this is even more important as state is passed in-between 
> micro-batches and sparse partitions may skip messages until a message finally 
> arrives within the read time-frame. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-704) KafkaIO should handle "latest offset" evenly, and persist it as part of the CheckpointMark.

2016-10-07 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15556360#comment-15556360
 ] 

Amit Sela commented on BEAM-704:


A read from Kafka requires to specify topic/s and either specific partitions or 
"earliest/latest".
If we were to handle that on splitting, all Kafka reads would have a "starting" 
CheckpointMark - either specified or the once fetched by "earliest/latest". 
That should work, no ?

> KafkaIO should handle "latest offset" evenly, and persist it as part of the 
> CheckpointMark.
> ---
>
> Key: BEAM-704
> URL: https://issues.apache.org/jira/browse/BEAM-704
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Reporter: Amit Sela
>Assignee: Raghu Angadi
>
> Currently, the KafkaIO (when configured to "latest") will check the latest 
> offset on the worker. This means that each worker sees a "different" latest 
> for the time it checks for the partitions assigned to it.
> This also means that if a worker fails before starting to read, and new 
> messages were added in between, they would be missed.
> I think we should consider checking the offsets (could be the same for 
> "earliest") when running initialSplits (that's how Spark does that as well, 
> one call from the driver for all topic-partitions).
> I'd also suggest we persist the latest offset as part of the CheckpointMark 
> so that once latest is set, it is remembered until new messages arrive and it 
> doesn't need to be resolved again (and if there were new messages available 
> they won't be missed upon failure).
> For Spark this is even more important as state is passed in-between 
> micro-batches and sparse partitions may skip messages until a message finally 
> arrives within the read time-frame. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-704) KafkaIO should handle "latest offset" evenly, and persist it as part of the CheckpointMark.

2016-10-07 Thread Raghu Angadi (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1737#comment-1737
 ] 

Raghu Angadi commented on BEAM-704:
---

[~dhalp...@google.com] expanded a bit more on this. The issue you might be 
pointing to is that there is no offset stored in the checkpoint for a partition 
if the reader hasn't ever read a record. That should be easily fixable, the 
reader can set the current offset in reader.start() : 
https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L935

> KafkaIO should handle "latest offset" evenly, and persist it as part of the 
> CheckpointMark.
> ---
>
> Key: BEAM-704
> URL: https://issues.apache.org/jira/browse/BEAM-704
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Reporter: Amit Sela
>
> Currently, the KafkaIO (when configured to "latest") will check the latest 
> offset on the worker. This means that each worker sees a "different" latest 
> for the time it checks for the partitions assigned to it.
> This also means that if a worker fails before starting to read, and new 
> messages were added in between, they would be missed.
> I think we should consider checking the offsets (could be the same for 
> "earliest") when running initialSplits (that's how Spark does that as well, 
> one call from the driver for all topic-partitions).
> I'd also suggest we persist the latest offset as part of the CheckpointMark 
> so that once latest is set, it is remembered until new messages arrive and it 
> doesn't need to be resolved again (and if there were new messages available 
> they won't be missed upon failure).
> For Spark this is even more important as state is passed in-between 
> micro-batches and sparse partitions may skip messages until a message finally 
> arrives within the read time-frame. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-704) KafkaIO should handle "latest offset" evenly, and persist it as part of the CheckpointMark.

2016-10-07 Thread Raghu Angadi (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1572#comment-1572
 ] 

Raghu Angadi commented on BEAM-704:
---


.q I'd also suggest we persist the latest offset as part of the CheckpointMark 
so that once latest is set, it is remembered until new messages arrive and it 
doesn't need to be resolved again (and if there were new messages available 
they won't be missed upon failure).

The offset is the primary info stored in CheckpointMark. Can you elaborate on 
what is missing now?

.q I think we should consider checking the offsets (could be the same for 
"earliest") when running initialSplits (that's how Spark does that as well, one 
call from the driver for all topic-partitions).

Can you expand on this with an example? offset is part of checkpoint. So the 
reader either resumes from checkpoined offset or the 'default offset' when it 
is restarted from scratch. 'Default offset' is 'latest' in default config. 
Users can customize this (e.g. by setting a group-id that preserves latest 
consumed offset on the Kafka, though this is not the same as Beam checkpoint).


> KafkaIO should handle "latest offset" evenly, and persist it as part of the 
> CheckpointMark.
> ---
>
> Key: BEAM-704
> URL: https://issues.apache.org/jira/browse/BEAM-704
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Reporter: Amit Sela
>
> Currently, the KafkaIO (when configured to "latest") will check the latest 
> offset on the worker. This means that each worker sees a "different" latest 
> for the time it checks for the partitions assigned to it.
> This also means that if a worker fails before starting to read, and new 
> messages were added in between, they would be missed.
> I think we should consider checking the offsets (could be the same for 
> "earliest") when running initialSplits (that's how Spark does that as well, 
> one call from the driver for all topic-partitions).
> I'd also suggest we persist the latest offset as part of the CheckpointMark 
> so that once latest is set, it is remembered until new messages arrive and it 
> doesn't need to be resolved again (and if there were new messages available 
> they won't be missed upon failure).
> For Spark this is even more important as state is passed in-between 
> micro-batches and sparse partitions may skip messages until a message finally 
> arrives within the read time-frame. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)