[jira] [Commented] (BEAM-958) desiredNumWorkers in Dataflow is too low
[ https://issues.apache.org/jira/browse/BEAM-958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15655020#comment-15655020 ] Raghu Angadi commented on BEAM-958: --- A change to this policy can break Dataflow job update depending the source as update requires number of sources to remain same across an update. Native pubsub source is not affected. > desiredNumWorkers in Dataflow is too low > > > Key: BEAM-958 > URL: https://issues.apache.org/jira/browse/BEAM-958 > Project: Beam > Issue Type: Improvement > Components: runner-dataflow >Affects Versions: 0.3.0-incubating >Reporter: Raghu Angadi >Assignee: Davor Bonaci > Labels: breaking_change > > {{desiredNumWorkers}} in [UnboundedSource > API|https://github.com/apache/incubator-beam/blob/v0.3.0-incubating-RC1/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L69] > is a suggestion to a source about how many splits it should create. KafkaIO > currently takes this literally and only creates up to this many splits. > The main draw back is that it is very low in Dataflow. It is calculated as > * {{1 * maxNumWorkers}} if {{--maxNumWorkers}} is specified, otherwise > * {{3 * numWorkers}}. > That implies there is only single reader per worker (which is usually a 4 > core VM). That can leave CPU under utilized on many pipelines. > Even 3x in case of fixes number of workers seems low to me. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (BEAM-958) desiredNumWorkers in Dataflow is too low
[ https://issues.apache.org/jira/browse/BEAM-958?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Raghu Angadi updated BEAM-958: -- Labels: breaking_change (was: ) > desiredNumWorkers in Dataflow is too low > > > Key: BEAM-958 > URL: https://issues.apache.org/jira/browse/BEAM-958 > Project: Beam > Issue Type: Improvement > Components: runner-dataflow >Affects Versions: 0.3.0-incubating >Reporter: Raghu Angadi >Assignee: Davor Bonaci > Labels: breaking_change > > {{desiredNumWorkers}} in [UnboundedSource > API|https://github.com/apache/incubator-beam/blob/v0.3.0-incubating-RC1/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L69] > is a suggestion to a source about how many splits it should create. KafkaIO > currently takes this literally and only creates up to this many splits. > The main draw back is that it is very low in Dataflow. It is calculated as > * {{1 * maxNumWorkers}} if {{--maxNumWorkers}} is specified, otherwise > * {{3 * numWorkers}}. > That implies there is only single reader per worker (which is usually a 4 > core VM). That can leave CPU under utilized on many pipelines. > Even 3x in case of fixes number of workers seems low to me. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (BEAM-958) desiredNumWorkers in Dataflow is too low
Raghu Angadi created BEAM-958: - Summary: desiredNumWorkers in Dataflow is too low Key: BEAM-958 URL: https://issues.apache.org/jira/browse/BEAM-958 Project: Beam Issue Type: Improvement Components: runner-dataflow Affects Versions: 0.3.0-incubating Reporter: Raghu Angadi Assignee: Davor Bonaci {{desiredNumWorkers}} in [UnboundedSource API|https://github.com/apache/incubator-beam/blob/v0.3.0-incubating-RC1/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L69] is a suggestion to a source about how many splits it should create. KafkaIO currently takes this literally and only creates up to this many splits. The main draw back is that it is very low in Dataflow. It is calculated as * {{1 * maxNumWorkers}} if {{--maxNumWorkers}} is specified, otherwise * {{3 * numWorkers}}. That implies there is only single reader per worker (which is usually a 4 core VM). That can leave CPU under utilized on many pipelines. Even 3x in case of fixes number of workers seems low to me. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-704) KafkaIO should handle "latest offset" evenly, and persist it as part of the CheckpointMark.
[ https://issues.apache.org/jira/browse/BEAM-704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15647814#comment-15647814 ] Raghu Angadi commented on BEAM-704: --- For further discussion and the fix see https://github.com/apache/incubator-beam/pull/1071. > KafkaIO should handle "latest offset" evenly, and persist it as part of the > CheckpointMark. > --- > > Key: BEAM-704 > URL: https://issues.apache.org/jira/browse/BEAM-704 > Project: Beam > Issue Type: Improvement > Components: sdk-java-extensions >Reporter: Amit Sela >Assignee: Raghu Angadi > > Currently, the KafkaIO (when configured to "latest") will check the latest > offset on the worker. This means that each worker sees a "different" latest > for the time it checks for the partitions assigned to it. > This also means that if a worker fails before starting to read, and new > messages were added in between, they would be missed. > I think we should consider checking the offsets (could be the same for > "earliest") when running initialSplits (that's how Spark does that as well, > one call from the driver for all topic-partitions). > I'd also suggest we persist the latest offset as part of the CheckpointMark > so that once latest is set, it is remembered until new messages arrive and it > doesn't need to be resolved again (and if there were new messages available > they won't be missed upon failure). > For Spark this is even more important as state is passed in-between > micro-batches and sparse partitions may skip messages until a message finally > arrives within the read time-frame. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (BEAM-591) Better handling of watermark in KafkaIO
[ https://issues.apache.org/jira/browse/BEAM-591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Raghu Angadi updated BEAM-591: -- Summary: Better handling of watermark in KafkaIO (was: Better handling watermark in KafkaIO) > Better handling of watermark in KafkaIO > --- > > Key: BEAM-591 > URL: https://issues.apache.org/jira/browse/BEAM-591 > Project: Beam > Issue Type: Bug > Components: sdk-java-extensions >Reporter: Raghu Angadi >Assignee: Raghu Angadi > > Right now default watermark in KafkaIO is same as timestamp of the record. > The main problem with this is that watermark does not change if there n't any > new records on the topic. This can hold up many open windows. > The record timestamp by default is set to processing time (i.e. when the > runner reads a record from Kafka reader). > A user can provide functions to calculate watermark and record timestamps. > There are a few concerns with current design: > * What should happen when a kafka topic is idle: > ** in default case, I think watermark should advance to current time. > ** What should happen when user has provided a function to calculate record > timestamp? >*** Should the watermark stay same as record timestamp? >*** same when user has provided own watermark function? > * Are the current semantics of user provided watermark function correct? > ** -it is run once for each record read-. > ** -Should it instead be run inside {{getWatermark()}} called by the runner > (we could still provide the last user record, and its timestamp)-. > ** It does run inside {{getWatermark()}}. should we pass current record > timestamp in addition to the record? > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (BEAM-777) KafkaIO Test should handle reader.start() better
Raghu Angadi created BEAM-777: - Summary: KafkaIO Test should handle reader.start() better Key: BEAM-777 URL: https://issues.apache.org/jira/browse/BEAM-777 Project: Beam Issue Type: Bug Components: sdk-java-extensions Reporter: Raghu Angadi Assignee: Raghu Angadi Priority: Minor KafkaIOTest currently expects reader.start() to return true. It can return false as well. It should handle it better. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (BEAM-257) fix ConcurrentModificationException in KafkaIO
[ https://issues.apache.org/jira/browse/BEAM-257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Raghu Angadi resolved BEAM-257. --- Resolution: Fixed Fix Version/s: 0.1.0-incubating https://github.com/apache/incubator-beam/pull/290 > fix ConcurrentModificationException in KafkaIO > -- > > Key: BEAM-257 > URL: https://issues.apache.org/jira/browse/BEAM-257 > Project: Beam > Issue Type: Bug > Components: sdk-java-extensions >Reporter: Raghu Angadi >Assignee: Raghu Angadi > Fix For: 0.1.0-incubating > > > [~tgroh] reported the following exception with KafkaIO while using with > DirectRunner. Fixing it in https://github.com/apache/incubator-beam/pull/290 > {code} > java.util.ConcurrentModificationException: KafkaConsumer is not safe for > multi-threaded access > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1324) > at > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1255) > at com.google.common.io.Closeables.close(Closeables.java:79) > at > org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader.close(KafkaIO.java:1050) > at > org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.finishBundle(UnboundedReadEvaluatorFactory.java:167) > at > org.apache.beam.runners.direct.TransformExecutor.finishBundle(TransformExecutor.java:161) > at > org.apache.beam.runners.direct.TransformExecutor.call(TransformExecutor.java:116) > at > org.apache.beam.runners.direct.TransformExecutor.call(TransformExecutor.java:41) > at java.util.concurrent.FutureTask.run(Unknown Source) > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) > at java.lang.Thread.run(Unknown Source) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-591) Better handling watermark in KafkaIO
[ https://issues.apache.org/jira/browse/BEAM-591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15572540#comment-15572540 ] Raghu Angadi commented on BEAM-591: --- PubSubIO does advance the watermark to current time if there haven't been any records recently ([line 996|https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java#L996]). PubSub tracks last one minute of timestamps since pubsub might deliver out of order. Kafka does not have that issue. In addition KafkaIO knows if it has caught up with the latest records. For the default watermark case (i.e. KafkaIO processing time), I propose KafkaIO should advance to current time when the backlog is zero (back log is updated every 5 seconds). This will cover most of the use cases. This policy would be fine for custom timestamps too (as in PubSubIO). If the users want more control, we could invoke watermark function with out the the kafka record so that user can return current timestamp. > Better handling watermark in KafkaIO > > > Key: BEAM-591 > URL: https://issues.apache.org/jira/browse/BEAM-591 > Project: Beam > Issue Type: Bug > Components: sdk-java-extensions >Reporter: Raghu Angadi >Assignee: Raghu Angadi > > Right now default watermark in KafkaIO is same as timestamp of the record. > The main problem with this is that watermark does not change if there n't any > new records on the topic. This can hold up many open windows. > The record timestamp by default is set to processing time (i.e. when the > runner reads a record from Kafka reader). > A user can provide functions to calculate watermark and record timestamps. > There are a few concerns with current design: > * What should happen when a kafka topic is idle: > ** in default case, I think watermark should advance to current time. > ** What should happen when user has provided a function to calculate record > timestamp? >*** Should the watermark stay same as record timestamp? >*** same when user has provided own watermark function? > * Are the current semantics of user provided watermark function correct? > ** -it is run once for each record read-. > ** -Should it instead be run inside {{getWatermark()}} called by the runner > (we could still provide the last user record, and its timestamp)-. > ** It does run inside {{getWatermark()}}. should we pass current record > timestamp in addition to the record? > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-744) A runner should be able to override KafkaIO max wait properties.
[ https://issues.apache.org/jira/browse/BEAM-744?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15569856#comment-15569856 ] Raghu Angadi commented on BEAM-744: --- > KAFKA_POLL_TIMEOUT - consumer poll timeout, default: 1 second. This timeout is KafkaIO internal implementation detail and should be ignored here. It does not impose any limitations on the reader (i.e. the reader can be closed before this timeout and everything is cleaned up properly). > A runner should be able to override KafkaIO max wait properties. > > > Key: BEAM-744 > URL: https://issues.apache.org/jira/browse/BEAM-744 > Project: Beam > Issue Type: Bug > Components: sdk-java-extensions >Reporter: Amit Sela > > KafkaIO has three "wait" properties: > {{KAFKA_POLL_TIMEOUT}} - consumer poll timeout, default: 1 second. > {{START_NEW_RECORDS_POLL_TIMEOUT}} - wait for new records inside {{start()}}, > default: 5 seconds. > {{NEW_RECORDS_POLL_TIMEOUT}} - wait for new records inside {{start()}}, > default: 10 msec. > [~rangadi] mentioned some of these were set to due to limitations of the > DirectRunner, and I can add that they are now limiting the Spark runner > (which reads in defined time frames, which may be smaller then the wait time > and so never actually read). > This feels like defaults should be set for optimal read from Kafka, while a > runner may override those if it needs to. > [~rangadi] also mentioned that this could be set in {{PipelineOptions}} which > may be passed when creating the reader. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-704) KafkaIO should handle "latest offset" evenly, and persist it as part of the CheckpointMark.
[ https://issues.apache.org/jira/browse/BEAM-704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15556812#comment-15556812 ] Raghu Angadi commented on BEAM-704: --- > Beam does not ask the reader for checkpoint I meant say 'does not ask the Source' > KafkaIO should handle "latest offset" evenly, and persist it as part of the > CheckpointMark. > --- > > Key: BEAM-704 > URL: https://issues.apache.org/jira/browse/BEAM-704 > Project: Beam > Issue Type: Improvement > Components: sdk-java-extensions >Reporter: Amit Sela >Assignee: Raghu Angadi > > Currently, the KafkaIO (when configured to "latest") will check the latest > offset on the worker. This means that each worker sees a "different" latest > for the time it checks for the partitions assigned to it. > This also means that if a worker fails before starting to read, and new > messages were added in between, they would be missed. > I think we should consider checking the offsets (could be the same for > "earliest") when running initialSplits (that's how Spark does that as well, > one call from the driver for all topic-partitions). > I'd also suggest we persist the latest offset as part of the CheckpointMark > so that once latest is set, it is remembered until new messages arrive and it > doesn't need to be resolved again (and if there were new messages available > they won't be missed upon failure). > For Spark this is even more important as state is passed in-between > micro-batches and sparse partitions may skip messages until a message finally > arrives within the read time-frame. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-704) KafkaIO should handle "latest offset" evenly, and persist it as part of the CheckpointMark.
[ https://issues.apache.org/jira/browse/BEAM-704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15556749#comment-15556749 ] Raghu Angadi commented on BEAM-704: --- https://github.com/apache/incubator-beam/pull/1071 : sets offset on the reader. {quote} A read from Kafka requires to specify topic/s and either specific partitions or "earliest/latest". {quote} Thats is not true. It does not require either 'earliest' or 'latest'. 'latest' is default. You can have a consumer-group id, in which case it would defailt to what is committed for that consumer-id. {quote} If we were to handle that on splitting, all Kafka reads would have a "starting" CheckpointMark {quote} That is not correct. IFAIK, Beam does not ask the reader for checkpoint (at least Google Dataflow does not). getCheckpointMark() is only called on the reader on the worker. > KafkaIO should handle "latest offset" evenly, and persist it as part of the > CheckpointMark. > --- > > Key: BEAM-704 > URL: https://issues.apache.org/jira/browse/BEAM-704 > Project: Beam > Issue Type: Improvement > Components: sdk-java-extensions >Reporter: Amit Sela >Assignee: Raghu Angadi > > Currently, the KafkaIO (when configured to "latest") will check the latest > offset on the worker. This means that each worker sees a "different" latest > for the time it checks for the partitions assigned to it. > This also means that if a worker fails before starting to read, and new > messages were added in between, they would be missed. > I think we should consider checking the offsets (could be the same for > "earliest") when running initialSplits (that's how Spark does that as well, > one call from the driver for all topic-partitions). > I'd also suggest we persist the latest offset as part of the CheckpointMark > so that once latest is set, it is remembered until new messages arrive and it > doesn't need to be resolved again (and if there were new messages available > they won't be missed upon failure). > For Spark this is even more important as state is passed in-between > micro-batches and sparse partitions may skip messages until a message finally > arrives within the read time-frame. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-704) KafkaIO should handle "latest offset" evenly, and persist it as part of the CheckpointMark.
[ https://issues.apache.org/jira/browse/BEAM-704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1737#comment-1737 ] Raghu Angadi commented on BEAM-704: --- [~dhalp...@google.com] expanded a bit more on this. The issue you might be pointing to is that there is no offset stored in the checkpoint for a partition if the reader hasn't ever read a record. That should be easily fixable, the reader can set the current offset in reader.start() : https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L935 > KafkaIO should handle "latest offset" evenly, and persist it as part of the > CheckpointMark. > --- > > Key: BEAM-704 > URL: https://issues.apache.org/jira/browse/BEAM-704 > Project: Beam > Issue Type: Improvement > Components: sdk-java-extensions >Reporter: Amit Sela > > Currently, the KafkaIO (when configured to "latest") will check the latest > offset on the worker. This means that each worker sees a "different" latest > for the time it checks for the partitions assigned to it. > This also means that if a worker fails before starting to read, and new > messages were added in between, they would be missed. > I think we should consider checking the offsets (could be the same for > "earliest") when running initialSplits (that's how Spark does that as well, > one call from the driver for all topic-partitions). > I'd also suggest we persist the latest offset as part of the CheckpointMark > so that once latest is set, it is remembered until new messages arrive and it > doesn't need to be resolved again (and if there were new messages available > they won't be missed upon failure). > For Spark this is even more important as state is passed in-between > micro-batches and sparse partitions may skip messages until a message finally > arrives within the read time-frame. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (BEAM-704) KafkaIO should handle "latest offset" evenly, and persist it as part of the CheckpointMark.
[ https://issues.apache.org/jira/browse/BEAM-704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Raghu Angadi reassigned BEAM-704: - Assignee: Raghu Angadi > KafkaIO should handle "latest offset" evenly, and persist it as part of the > CheckpointMark. > --- > > Key: BEAM-704 > URL: https://issues.apache.org/jira/browse/BEAM-704 > Project: Beam > Issue Type: Improvement > Components: sdk-java-extensions >Reporter: Amit Sela >Assignee: Raghu Angadi > > Currently, the KafkaIO (when configured to "latest") will check the latest > offset on the worker. This means that each worker sees a "different" latest > for the time it checks for the partitions assigned to it. > This also means that if a worker fails before starting to read, and new > messages were added in between, they would be missed. > I think we should consider checking the offsets (could be the same for > "earliest") when running initialSplits (that's how Spark does that as well, > one call from the driver for all topic-partitions). > I'd also suggest we persist the latest offset as part of the CheckpointMark > so that once latest is set, it is remembered until new messages arrive and it > doesn't need to be resolved again (and if there were new messages available > they won't be missed upon failure). > For Spark this is even more important as state is passed in-between > micro-batches and sparse partitions may skip messages until a message finally > arrives within the read time-frame. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (BEAM-704) KafkaIO should handle "latest offset" evenly, and persist it as part of the CheckpointMark.
[ https://issues.apache.org/jira/browse/BEAM-704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1572#comment-1572 ] Raghu Angadi edited comment on BEAM-704 at 10/7/16 4:41 PM: {quote} I'd also suggest we persist the latest offset as part of the CheckpointMark so that once latest is set, it is remembered until new messages arrive and it doesn't need to be resolved again (and if there were new messages available they won't be missed upon failure). {quote} The offset is the primary info stored in CheckpointMark. Can you elaborate on what is missing now? {quote} I think we should consider checking the offsets (could be the same for "earliest") when running initialSplits (that's how Spark does that as well, one call from the driver for all topic-partitions). {quote} Can you expand on this with an example? offset is part of checkpoint. So the reader either resumes from checkpoined offset or the 'default offset' when it is restarted from scratch. 'Default offset' is 'latest' in default config. Users can customize this (e.g. by setting a group-id that preserves latest consumed offset on the Kafka, though this is not the same as Beam checkpoint). was (Author: rangadi): .q I'd also suggest we persist the latest offset as part of the CheckpointMark so that once latest is set, it is remembered until new messages arrive and it doesn't need to be resolved again (and if there were new messages available they won't be missed upon failure). The offset is the primary info stored in CheckpointMark. Can you elaborate on what is missing now? .q I think we should consider checking the offsets (could be the same for "earliest") when running initialSplits (that's how Spark does that as well, one call from the driver for all topic-partitions). Can you expand on this with an example? offset is part of checkpoint. So the reader either resumes from checkpoined offset or the 'default offset' when it is restarted from scratch. 'Default offset' is 'latest' in default config. Users can customize this (e.g. by setting a group-id that preserves latest consumed offset on the Kafka, though this is not the same as Beam checkpoint). > KafkaIO should handle "latest offset" evenly, and persist it as part of the > CheckpointMark. > --- > > Key: BEAM-704 > URL: https://issues.apache.org/jira/browse/BEAM-704 > Project: Beam > Issue Type: Improvement > Components: sdk-java-extensions >Reporter: Amit Sela > > Currently, the KafkaIO (when configured to "latest") will check the latest > offset on the worker. This means that each worker sees a "different" latest > for the time it checks for the partitions assigned to it. > This also means that if a worker fails before starting to read, and new > messages were added in between, they would be missed. > I think we should consider checking the offsets (could be the same for > "earliest") when running initialSplits (that's how Spark does that as well, > one call from the driver for all topic-partitions). > I'd also suggest we persist the latest offset as part of the CheckpointMark > so that once latest is set, it is remembered until new messages arrive and it > doesn't need to be resolved again (and if there were new messages available > they won't be missed upon failure). > For Spark this is even more important as state is passed in-between > micro-batches and sparse partitions may skip messages until a message finally > arrives within the read time-frame. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-704) KafkaIO should handle "latest offset" evenly, and persist it as part of the CheckpointMark.
[ https://issues.apache.org/jira/browse/BEAM-704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1572#comment-1572 ] Raghu Angadi commented on BEAM-704: --- .q I'd also suggest we persist the latest offset as part of the CheckpointMark so that once latest is set, it is remembered until new messages arrive and it doesn't need to be resolved again (and if there were new messages available they won't be missed upon failure). The offset is the primary info stored in CheckpointMark. Can you elaborate on what is missing now? .q I think we should consider checking the offsets (could be the same for "earliest") when running initialSplits (that's how Spark does that as well, one call from the driver for all topic-partitions). Can you expand on this with an example? offset is part of checkpoint. So the reader either resumes from checkpoined offset or the 'default offset' when it is restarted from scratch. 'Default offset' is 'latest' in default config. Users can customize this (e.g. by setting a group-id that preserves latest consumed offset on the Kafka, though this is not the same as Beam checkpoint). > KafkaIO should handle "latest offset" evenly, and persist it as part of the > CheckpointMark. > --- > > Key: BEAM-704 > URL: https://issues.apache.org/jira/browse/BEAM-704 > Project: Beam > Issue Type: Improvement > Components: sdk-java-extensions >Reporter: Amit Sela > > Currently, the KafkaIO (when configured to "latest") will check the latest > offset on the worker. This means that each worker sees a "different" latest > for the time it checks for the partitions assigned to it. > This also means that if a worker fails before starting to read, and new > messages were added in between, they would be missed. > I think we should consider checking the offsets (could be the same for > "earliest") when running initialSplits (that's how Spark does that as well, > one call from the driver for all topic-partitions). > I'd also suggest we persist the latest offset as part of the CheckpointMark > so that once latest is set, it is remembered until new messages arrive and it > doesn't need to be resolved again (and if there were new messages available > they won't be missed upon failure). > For Spark this is even more important as state is passed in-between > micro-batches and sparse partitions may skip messages until a message finally > arrives within the read time-frame. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (BEAM-591) Better handling watermark in KafkaIO
[ https://issues.apache.org/jira/browse/BEAM-591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Raghu Angadi updated BEAM-591: -- Description: Right now default watermark in KafkaIO is same as timestamp of the record. The main problem with this is that watermark does not change if there n't any new records on the topic. This can hold up many open windows. The record timestamp by default is set to processing time (i.e. when the runner reads a record from Kafka reader). A user can provide functions to calculate watermark and record timestamps. There are a few concerns with current design: * What should happen when a kafka topic is idle: ** in default case, I think watermark should advance to current time. ** What should happen when user has provided a function to calculate record timestamp? *** Should the watermark stay same as record timestamp? *** same when user has provided own watermark function? * Are the current semantics of user provided watermark function correct? ** it is run once for each record read. ** Should it instead be run inside {{getWatermark()}} called by the runner (we could still provide the last user record, and its timestamp). was: Right now default watermark in KafkaIO is same as timestamp of the record. The main problem with this is that watermark does not change if there n't any new records on the topic. This can hold up many open windows. The record timestamp by default is set to processing time (i.e. when the runner reads a record from Kafka reader). A user provide functions to calculate watermark and record timestamps. There are a few concerns: * What should happen when a kafka topic is idle: ** in default case, I think watermark should advance to current time. ** What should happen when user has provided a function to calculate record timestamp? *** Should the watermark stay same as record timestamp? *** same when user has provided own watermark function? * Are the current semantics of user provided watermark function correct? ** it is run once for each record read. ** Should it instead be run inside {{getWatermark()}} called by the runner (we could still provide the last user record, and its timestamp). > Better handling watermark in KafkaIO > > > Key: BEAM-591 > URL: https://issues.apache.org/jira/browse/BEAM-591 > Project: Beam > Issue Type: Bug > Components: sdk-java-extensions >Reporter: Raghu Angadi >Assignee: Raghu Angadi > > Right now default watermark in KafkaIO is same as timestamp of the record. > The main problem with this is that watermark does not change if there n't any > new records on the topic. This can hold up many open windows. > The record timestamp by default is set to processing time (i.e. when the > runner reads a record from Kafka reader). > A user can provide functions to calculate watermark and record timestamps. > There are a few concerns with current design: > * What should happen when a kafka topic is idle: > ** in default case, I think watermark should advance to current time. > ** What should happen when user has provided a function to calculate record > timestamp? >*** Should the watermark stay same as record timestamp? >*** same when user has provided own watermark function? > * Are the current semantics of user provided watermark function correct? > ** it is run once for each record read. > ** Should it instead be run inside {{getWatermark()}} called by the runner > (we could still provide the last user record, and its timestamp). > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (BEAM-591) Better handling watermark in KafkaIO
Raghu Angadi created BEAM-591: - Summary: Better handling watermark in KafkaIO Key: BEAM-591 URL: https://issues.apache.org/jira/browse/BEAM-591 Project: Beam Issue Type: Bug Components: sdk-java-extensions Reporter: Raghu Angadi Assignee: Raghu Angadi Right now default watermark in KafkaIO is same as timestamp of the record. The main problem with this is that watermark does not change if there n't any new records on the topic. This can hold up many open windows. The record timestamp by default is set to processing time (i.e. when the runner reads a record from Kafka reader). A user provide functions to calculate watermark and record timestamps. There are a few concerns: * What should happen when a kafka topic is idle: ** in default case, I think watermark should advance to current time. ** What should happen when user has provided a function to calculate record timestamp? *** Should the watermark stay same as record timestamp? *** same when user has provided own watermark function? * Are the current semantics of user provided watermark function correct? ** it is run once for each record read. ** Should it instead be run inside {{getWatermark()}} called by the runner (we could still provide the last user record, and its timestamp). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (BEAM-429) minor: remove an obsolete comment in KafakIOTest.java
Raghu Angadi created BEAM-429: - Summary: minor: remove an obsolete comment in KafakIOTest.java Key: BEAM-429 URL: https://issues.apache.org/jira/browse/BEAM-429 Project: Beam Issue Type: Improvement Reporter: Raghu Angadi Assignee: Raghu Angadi Priority: Minor see https://github.com/apache/incubator-beam/pull/606 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (BEAM-395) KafkaIO tests should be marked 'NeedsRunner'
Raghu Angadi created BEAM-395: - Summary: KafkaIO tests should be marked 'NeedsRunner' Key: BEAM-395 URL: https://issues.apache.org/jira/browse/BEAM-395 Project: Beam Issue Type: Bug Components: sdk-java-extensions, testing Affects Versions: 0.1.0-incubating Reporter: Raghu Angadi Assignee: Raghu Angadi KafkaIO Tests are currently marked as 'RunnableOnService', which implies the tests can run on any beam runner. This is not correct. Though the tests are not runner dependent per se, they depend on mocks for Kafka consumer and kafka producer. That requires the tests to be local to a JVM. These tests can only run on single JVM runners like DirectRunner. These tests should be marked 'NeedsRunner' for now. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (BEAM-340) Use Avro coder for KafkaIO
[ https://issues.apache.org/jira/browse/BEAM-340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Raghu Angadi resolved BEAM-340. --- Resolution: Duplicate > Use Avro coder for KafkaIO > --- > > Key: BEAM-340 > URL: https://issues.apache.org/jira/browse/BEAM-340 > Project: Beam > Issue Type: Improvement >Reporter: Raghu Angadi > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (BEAM-340) Use Avro coder for KafkaIO
Raghu Angadi created BEAM-340: - Summary: Use Avro coder for KafkaIO Key: BEAM-340 URL: https://issues.apache.org/jira/browse/BEAM-340 Project: Beam Issue Type: Improvement Reporter: Raghu Angadi -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (BEAM-326) A minor update to KafkaIO javadoc
Raghu Angadi created BEAM-326: - Summary: A minor update to KafkaIO javadoc Key: BEAM-326 URL: https://issues.apache.org/jira/browse/BEAM-326 Project: Beam Issue Type: Bug Components: sdk-java-extensions Reporter: Raghu Angadi Assignee: Raghu Angadi Priority: Trivial Update code sample for KafkaIO.write() to be consistent with another code sample below it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-52) KafkaIO - bounded/unbounded, source/sink
[ https://issues.apache.org/jira/browse/BEAM-52?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15316724#comment-15316724 ] Raghu Angadi commented on BEAM-52: -- Yep. Closing now. > KafkaIO - bounded/unbounded, source/sink > > > Key: BEAM-52 > URL: https://issues.apache.org/jira/browse/BEAM-52 > Project: Beam > Issue Type: New Feature > Components: sdk-java-extensions >Reporter: Daniel Halperin >Assignee: Raghu Angadi > > We should support Apache Kafka. The priority list is probably: > * UnboundedSource > * unbounded Sink > * BoundedSource > * bounded Sink > The connector should be well-tested, especially around UnboundedSource > checkpointing and resuming, and data duplication. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (BEAM-271) Option to configure remote Dataflow windmill service endpoint
Raghu Angadi created BEAM-271: - Summary: Option to configure remote Dataflow windmill service endpoint Key: BEAM-271 URL: https://issues.apache.org/jira/browse/BEAM-271 Project: Beam Issue Type: Improvement Components: runner-dataflow Reporter: Raghu Angadi Assignee: Davor Bonaci Priority: Minor Add two options to DataflowPipelineDebugOptions to configure Dataflow remove windmill service. This lets Dataflow users to configure the streaming pipelines to point to remote windmill service. https://github.com/apache/incubator-beam/pull/314 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-52) KafkaIO - bounded/unbounded, source/sink
[ https://issues.apache.org/jira/browse/BEAM-52?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15272649#comment-15272649 ] Raghu Angadi commented on BEAM-52: -- Kafka Sink PR : https://github.com/apache/incubator-beam/pull/271 > KafkaIO - bounded/unbounded, source/sink > > > Key: BEAM-52 > URL: https://issues.apache.org/jira/browse/BEAM-52 > Project: Beam > Issue Type: New Feature > Components: sdk-java-extensions >Reporter: Daniel Halperin >Assignee: Raghu Angadi > > We should support Apache Kafka. The priority list is probably: > * UnboundedSource > * unbounded Sink > * BoundedSource > * bounded Sink > The connector should be well-tested, especially around UnboundedSource > checkpointing and resuming, and data duplication. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (BEAM-257) fix ConcurrentModificationException in KafkaIO
Raghu Angadi created BEAM-257: - Summary: fix ConcurrentModificationException in KafkaIO Key: BEAM-257 URL: https://issues.apache.org/jira/browse/BEAM-257 Project: Beam Issue Type: Bug Components: sdk-java-extensions Reporter: Raghu Angadi Assignee: James Malone [~tgroh] reported the following exception with KafkaIO while using with DirectRunner. Fixing it in https://github.com/apache/incubator-beam/pull/290 {code} java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1324) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1255) at com.google.common.io.Closeables.close(Closeables.java:79) at org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader.close(KafkaIO.java:1050) at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.finishBundle(UnboundedReadEvaluatorFactory.java:167) at org.apache.beam.runners.direct.TransformExecutor.finishBundle(TransformExecutor.java:161) at org.apache.beam.runners.direct.TransformExecutor.call(TransformExecutor.java:116) at org.apache.beam.runners.direct.TransformExecutor.call(TransformExecutor.java:41) at java.util.concurrent.FutureTask.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-52) KafkaIO - bounded/unbounded, source/sink
[ https://issues.apache.org/jira/browse/BEAM-52?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15264971#comment-15264971 ] Raghu Angadi commented on BEAM-52: -- > Kafka producer uses the key for selecting a partition. correction : the default partitioner in Kafka actually uses serialized bytes to pick a partition, but Partitioner interface includes both key and value objects, so some custom partitioners might use the key. > KafkaIO - bounded/unbounded, source/sink > > > Key: BEAM-52 > URL: https://issues.apache.org/jira/browse/BEAM-52 > Project: Beam > Issue Type: New Feature > Components: sdk-java-extensions >Reporter: Daniel Halperin >Assignee: Raghu Angadi > > We should support Apache Kafka. The priority list is probably: > * UnboundedSource > * unbounded Sink > * BoundedSource > * bounded Sink > The connector should be well-tested, especially around UnboundedSource > checkpointing and resuming, and data duplication. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-52) KafkaIO - bounded/unbounded, source/sink
[ https://issues.apache.org/jira/browse/BEAM-52?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15264860#comment-15264860 ] Raghu Angadi commented on BEAM-52: -- Ah, thanks. just skimmed through it. It looks pretty much on the same lines as what I have. couple of differences : - Kafka producer uses the key for selecting a partition. I wanted to retain that functionality for users. So I apply our coders inside custom Kafka serializers. Otherwise Kafka will hash on serialized byte array. - I noticed you are catching the exceptions in a callback and reporting it back.. may be I should do that too. will get Dan's opinion as well in PR. I will ping you to review my pull request. > KafkaIO - bounded/unbounded, source/sink > > > Key: BEAM-52 > URL: https://issues.apache.org/jira/browse/BEAM-52 > Project: Beam > Issue Type: New Feature > Components: sdk-java-extensions >Reporter: Daniel Halperin >Assignee: Raghu Angadi > > We should support Apache Kafka. The priority list is probably: > * UnboundedSource > * unbounded Sink > * BoundedSource > * bounded Sink > The connector should be well-tested, especially around UnboundedSource > checkpointing and resuming, and data duplication. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-52) KafkaIO - bounded/unbounded, source/sink
[ https://issues.apache.org/jira/browse/BEAM-52?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15264663#comment-15264663 ] Raghu Angadi commented on BEAM-52: -- [~aljoscha], I am working on Sink, will send a PR very soon (hopefully today). > KafkaIO - bounded/unbounded, source/sink > > > Key: BEAM-52 > URL: https://issues.apache.org/jira/browse/BEAM-52 > Project: Beam > Issue Type: New Feature > Components: sdk-java-extensions >Reporter: Daniel Halperin >Assignee: Raghu Angadi > > We should support Apache Kafka. The priority list is probably: > * UnboundedSource > * unbounded Sink > * BoundedSource > * bounded Sink > The connector should be well-tested, especially around UnboundedSource > checkpointing and resuming, and data duplication. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-220) KafkaIO test is flaky
[ https://issues.apache.org/jira/browse/BEAM-220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15254754#comment-15254754 ] Raghu Angadi commented on BEAM-220: --- I have seen this failure in travis builds in PR too. Not sure why this fails only on travis. Nothing suspicious yet in the log. > KafkaIO test is flaky > - > > Key: BEAM-220 > URL: https://issues.apache.org/jira/browse/BEAM-220 > Project: Beam > Issue Type: Bug > Components: sdk-java-extensions >Reporter: Daniel Halperin >Assignee: Raghu Angadi > > A test of checkpointing had an assertion failure: > https://builds.apache.org/job/beam_PreCommit_MavenVerify/713/org.apache.beam$kafka/testReport/junit/org.apache.beam.sdk.io.kafka/KafkaIOTest/testUnboundedSourceCheckpointMark/ > java.lang.AssertionError > at org.junit.Assert.fail(Assert.java:86) > at org.junit.Assert.assertTrue(Assert.java:41) > at org.junit.Assert.assertTrue(Assert.java:52) > at > org.apache.beam.sdk.io.kafka.KafkaIOTest.testUnboundedSourceCheckpointMark(KafkaIOTest.java:354) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) > at org.junit.runners.ParentRunner.run(ParentRunner.java:309) > at org.junit.runners.Suite.runChild(Suite.java:127) > at org.junit.runners.Suite.runChild(Suite.java:26) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) > at org.junit.runners.ParentRunner.run(ParentRunner.java:309) > at org.junit.runner.JUnitCore.run(JUnitCore.java:160) > at org.junit.runner.JUnitCore.run(JUnitCore.java:138) > at > org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAndRun(JUnitCoreWrapper.java:108) > at > org.apache.maven.surefire.junitcore.JUnitCoreWrapper.executeEager(JUnitCoreWrapper.java:78) > at > org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:54) > at > org.apache.maven.surefire.junitcore.JUnitCoreProvider.invoke(JUnitCoreProvider.java:144) > at > org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203) > at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155) > at > org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-52) KafkaIO - bounded/unbounded, source/sink
[ https://issues.apache.org/jira/browse/BEAM-52?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15172622#comment-15172622 ] Raghu Angadi commented on BEAM-52: -- [~dhalp...@google.com], please assign the jira to me. > KafkaIO - bounded/unbounded, source/sink > > > Key: BEAM-52 > URL: https://issues.apache.org/jira/browse/BEAM-52 > Project: Beam > Issue Type: New Feature > Components: sdk-java-extensions >Reporter: Daniel Halperin > > We should support Apache Kafka. The priority list is probably: > * UnboundedSource > * unbounded Sink > * BoundedSource > * bounded Sink > The connector should be well-tested, especially around UnboundedSource > checkpointing and resuming, and data duplication. -- This message was sent by Atlassian JIRA (v6.3.4#6332)