[jira] [Updated] (FLINK-15476) Update StreamingFileSink documentation -- bulk encoded writer now supports customized checkpoint policy
[ https://issues.apache.org/jira/browse/FLINK-15476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ying Xu updated FLINK-15476: Priority: Minor (was: Major) > Update StreamingFileSink documentation -- bulk encoded writer now supports > customized checkpoint policy > --- > > Key: FLINK-15476 > URL: https://issues.apache.org/jira/browse/FLINK-15476 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem, Documentation >Affects Versions: 1.10.0 >Reporter: Ying Xu >Priority: Minor > Labels: auto-unassigned, stale-major > > Per FLINK-13027, {{StreamingFileSink}}'s bulk encoded writer (created with > {{forBulkFormat}}) now supports customized checkpoint policies which roll > file at the checkpoint epoch. > The {{StreamingFileSink}} documentation needs to be updated accordingly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15476) Update StreamingFileSink documentation -- bulk encoded writer now supports customized checkpoint policy
[ https://issues.apache.org/jira/browse/FLINK-15476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032198#comment-17032198 ] Ying Xu commented on FLINK-15476: - Thanks for asking [~kkl0u] . I was busy with a few other work items. Would like to take on it in the next few days. Thanks! > Update StreamingFileSink documentation -- bulk encoded writer now supports > customized checkpoint policy > --- > > Key: FLINK-15476 > URL: https://issues.apache.org/jira/browse/FLINK-15476 > Project: Flink > Issue Type: Task > Components: Connectors / FileSystem, Documentation >Affects Versions: 1.10.0 >Reporter: Ying Xu >Assignee: Ying Xu >Priority: Major > > Per FLINK-13027, {{StreamingFileSink}}'s bulk encoded writer (created with > {{forBulkFormat}}) now supports customized checkpoint policies which roll > file at the checkpoint epoch. > The {{StreamingFileSink}} documentation needs to be updated accordingly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15476) Update StreamingFileSink documentation -- bulk encoded writer now supports customized checkpoint policy
[ https://issues.apache.org/jira/browse/FLINK-15476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17017522#comment-17017522 ] Ying Xu commented on FLINK-15476: - HI [~kkl0u] is it OK to pursue this Jira based on [the comment|https://github.com/apache/flink/pull/10653#issuecomment-568616531] in FLINK-13027 ? Thanks! > Update StreamingFileSink documentation -- bulk encoded writer now supports > customized checkpoint policy > --- > > Key: FLINK-15476 > URL: https://issues.apache.org/jira/browse/FLINK-15476 > Project: Flink > Issue Type: Task > Components: Documentation >Reporter: Ying Xu >Priority: Major > > Per FLINK-13027, {{StreamingFileSink}}'s bulk encoded writer (created with > {{forBulkFormat}}) now supports customized checkpoint policies which roll > file at the checkpoint epoch. > The {{StreamingFileSink}} documentation needs to be updated accordingly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15476) Update StreamingFileSink documentation -- bulk encoded writer now supports customized checkpoint policy
[ https://issues.apache.org/jira/browse/FLINK-15476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ying Xu updated FLINK-15476: Component/s: Documentation Description: Per FLINK-13027, {{StreamingFileSink}}'s bulk encoded writer (created with {{forBulkFormat}}) now supports customized checkpoint policies which roll file at the checkpoint epoch. The {{StreamingFileSink}} documentation needs to be updated accordingly. Issue Type: Task (was: Improvement) > Update StreamingFileSink documentation -- bulk encoded writer now supports > customized checkpoint policy > --- > > Key: FLINK-15476 > URL: https://issues.apache.org/jira/browse/FLINK-15476 > Project: Flink > Issue Type: Task > Components: Documentation >Reporter: Ying Xu >Priority: Major > > Per FLINK-13027, {{StreamingFileSink}}'s bulk encoded writer (created with > {{forBulkFormat}}) now supports customized checkpoint policies which roll > file at the checkpoint epoch. > The {{StreamingFileSink}} documentation needs to be updated accordingly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15476) Update StreamingFileSink documentation -- bulk encoded writer now supports customized checkpoint policy
Ying Xu created FLINK-15476: --- Summary: Update StreamingFileSink documentation -- bulk encoded writer now supports customized checkpoint policy Key: FLINK-15476 URL: https://issues.apache.org/jira/browse/FLINK-15476 Project: Flink Issue Type: Improvement Reporter: Ying Xu -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-15301) Flink Kinesis AsyncRecordEmitter needs to handle unchecked exception gracefully
[ https://issues.apache.org/jira/browse/FLINK-15301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16998418#comment-16998418 ] Ying Xu edited comment on FLINK-15301 at 12/17/19 5:45 PM: --- Example stacktrace {code:java} org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator {{ at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)}} {{ at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)}} {{ at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)}} {{ at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)}} {{ at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)}} {{ at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)}} {{ at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)}} {{ at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:772)}} {{ at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.access$000(KinesisDataFetcher.java:91)}} {{ at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$AsyncKinesisRecordEmitter.emit(KinesisDataFetcher.java:272)}} {{ at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$AsyncKinesisRecordEmitter.emit(KinesisDataFetcher.java:260)}} {{ at org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter.run(RecordEmitter.java:230)}} {{ at java.lang.Thread.run(Thread.java:748) ...{code} was (Author: yxu-apache): Example stacktrace {code:java} 05:39:32.393 INFO o.a.f.f.s.c.w.S3Committer - Committing f/event_name=dynamic_translation_missing/ds=2019-12-15/hr=05/part-450-96064 with MPU ID y5e5QQ3kTzi7TkuEirSdr.enmM7GaIkIxvjVRqIT0kXaMSPzhVQKLRu3vQuzZ.oziFr2vuXXGXEThkpsOiCCV17UQxGm5AtgDqslw33uc1aTHyIKFQXwNRpQCZCttZ_AxcLbltEyjd8m7ea15Bhf.A-- Exception in thread "recordEmitter-Source: json-events_source -> json-events_flatten_json_to_persistermessage -> json-events_local_record_count (188/512)" org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator {{ at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)}} {{ at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)}} {{ at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)}} {{ at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)}} {{ at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)}} {{ at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)}} {{ at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)}} {{ at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:772)}} {{ at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.access$000(KinesisDataFetcher.java:91)}} {{ at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$AsyncKinesisRecordEmitter.emit(KinesisDataFetcher.java:272)}} {{ at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$AsyncKinesisRecordEmitter.emit(KinesisDataFetcher.java:260)}} {{ at org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter.run(RecordEmitter.java:230)}} {{ at java.lang.Thread.run(Thread.java:748) ...{code} > Flink Kinesis AsyncRecordEmitter needs to handle unchecked exception > gracefully > --- > > Key: FLINK-15301 > URL: https://issues.apache.org/jira/browse/FLINK-15301 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kinesis >Reporter: Ying Xu >Priority: Major > > Currently, any runTime exception encountered inside the > `AsyncRecordEmitter.emitRecordAndUpdateState()` function could cause the > thread to exit silently. Flink job would continue to run, but the stopped
[jira] [Commented] (FLINK-15301) Flink Kinesis AsyncRecordEmitter needs to handle unchecked exception gracefully
[ https://issues.apache.org/jira/browse/FLINK-15301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16998418#comment-16998418 ] Ying Xu commented on FLINK-15301: - Example stacktrace {code:java} 05:39:32.393 INFO o.a.f.f.s.c.w.S3Committer - Committing f/event_name=dynamic_translation_missing/ds=2019-12-15/hr=05/part-450-96064 with MPU ID y5e5QQ3kTzi7TkuEirSdr.enmM7GaIkIxvjVRqIT0kXaMSPzhVQKLRu3vQuzZ.oziFr2vuXXGXEThkpsOiCCV17UQxGm5AtgDqslw33uc1aTHyIKFQXwNRpQCZCttZ_AxcLbltEyjd8m7ea15Bhf.A-- Exception in thread "recordEmitter-Source: json-events_source -> json-events_flatten_json_to_persistermessage -> json-events_local_record_count (188/512)" org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator {{ at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)}} {{ at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)}} {{ at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)}} {{ at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)}} {{ at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)}} {{ at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)}} {{ at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)}} {{ at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:772)}} {{ at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.access$000(KinesisDataFetcher.java:91)}} {{ at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$AsyncKinesisRecordEmitter.emit(KinesisDataFetcher.java:272)}} {{ at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$AsyncKinesisRecordEmitter.emit(KinesisDataFetcher.java:260)}} {{ at org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter.run(RecordEmitter.java:230)}} {{ at java.lang.Thread.run(Thread.java:748) ...{code} > Flink Kinesis AsyncRecordEmitter needs to handle unchecked exception > gracefully > --- > > Key: FLINK-15301 > URL: https://issues.apache.org/jira/browse/FLINK-15301 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kinesis >Reporter: Ying Xu >Priority: Major > > Currently, any runTime exception encountered inside the > `AsyncRecordEmitter.emitRecordAndUpdateState()` function could cause the > thread to exit silently. Flink job would continue to run, but the stopped > record emitter would subsequently cause Kinesis data consumption to stall. > > The AsyncRecordEmitter need to catch unchecked exception, log errors, and > perhaps trigger job restart subsequently. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15301) Flink Kinesis AsyncRecordEmitter needs to handle unchecked exception gracefully
Ying Xu created FLINK-15301: --- Summary: Flink Kinesis AsyncRecordEmitter needs to handle unchecked exception gracefully Key: FLINK-15301 URL: https://issues.apache.org/jira/browse/FLINK-15301 Project: Flink Issue Type: Improvement Components: Connectors / Kinesis Reporter: Ying Xu Currently, any runTime exception encountered inside the `AsyncRecordEmitter.emitRecordAndUpdateState()` function could cause the thread to exit silently. Flink job would continue to run, but the stopped record emitter would subsequently cause Kinesis data consumption to stall. The AsyncRecordEmitter need to catch unchecked exception, log errors, and perhaps trigger job restart subsequently. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14039) Flink Kinesis consumer: configurable per-shard consumption rate when running in adaptive mode
Ying Xu created FLINK-14039: --- Summary: Flink Kinesis consumer: configurable per-shard consumption rate when running in adaptive mode Key: FLINK-14039 URL: https://issues.apache.org/jira/browse/FLINK-14039 Project: Flink Issue Type: Improvement Components: Connectors / Kinesis Reporter: Ying Xu Currently, Flink kinesis connector has a fixed [2MB|https://github.com/apache/flink/blob/78748ea1aee8f9d0c0499180a2ef455490b32b24/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L59-L61] target rate (per-shard) when running in adaptive rate mode. In specific scenarios, it is desirable that users would want a different target rate. For example, when two Kinesis consumers share a common stream, the user may want to de-prioritize one stream such that it runs with a target rate < 2MB. It is relatively straightforward to implement this feature – simply add a per-shard target rate consumer config and has the default set to 2MB. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (FLINK-13864) StreamingFileSink: Allow inherited classes to extend StreamingFileSink correctly
[ https://issues.apache.org/jira/browse/FLINK-13864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16919902#comment-16919902 ] Ying Xu commented on FLINK-13864: - HI [~kkl0u] , this is Ying and I have been working with [~kailashhd] on this feature. Posted a PR [https://github.com/apache/flink/pull/9581] where you can find more details. We've tested a customized StreamingfileSink built on top of the new interface, and it was working fine. Would love your comments there. > StreamingFileSink: Allow inherited classes to extend StreamingFileSink > correctly > > > Key: FLINK-13864 > URL: https://issues.apache.org/jira/browse/FLINK-13864 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Reporter: Kailash Hassan Dayanand >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Currently the StreamingFileSink can't be extended correctly as there are a > few issues [PR |[https://github.com/apache/flink/pull/8469]] merged for this > [Jira|https://issues.apache.org/jira/browse/FLINK-12539] > Mailing list discussion: > [http://mail-archives.apache.org/mod_mbox/flink-dev/201908.mbox/%3CCACGLQUAxXjr2mBOf-6hbXcwmWoH5ib_0YEy-Vyjj%3DEPyQ25Qiw%40mail.gmail.com%3E] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Assigned] (FLINK-13027) StreamingFileSink bulk-encoded writer supports file rolling upon customized events
[ https://issues.apache.org/jira/browse/FLINK-13027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ying Xu reassigned FLINK-13027: --- Assignee: Ying Xu > StreamingFileSink bulk-encoded writer supports file rolling upon customized > events > -- > > Key: FLINK-13027 > URL: https://issues.apache.org/jira/browse/FLINK-13027 > Project: Flink > Issue Type: New Feature > Components: API / DataStream >Reporter: Ying Xu >Assignee: Ying Xu >Priority: Major > > When writing in bulk-encoded format such as Parquet, StreamingFileSink only > supports OnCheckpointRollingPolicy, which rolls file at checkpointing time. > > In many scenarios, it is beneficial that the sink can roll file upon certain > events, for example, when the file size reaches a limit. Such a rolling > policy can also potentially alleviate some of the side effects of > OnCheckpointRollingPolicy, e.g.,, most of the heavy liftings including file > uploading all happen at the checkpoint time. > Specifically, this Jira calls for a new rolling policy that rolls file: > # whenever a customized event happens, e.g., the file size reaches certain > limit. > # whenever a checkpoint happens. This is needed for providing exactly-once > guarantees when writing bulk-encoded files. > Users of this rolling policy need to be aware that the customized event and > the next checkpoint epoch may be close to each other, thus may yield a tiny > file per checkpoint at the worst. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-13027) StreamingFileSink bulk-encoded writer supports file rolling upon customized events
[ https://issues.apache.org/jira/browse/FLINK-13027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ying Xu reassigned FLINK-13027: --- Assignee: (was: Ying Xu) > StreamingFileSink bulk-encoded writer supports file rolling upon customized > events > -- > > Key: FLINK-13027 > URL: https://issues.apache.org/jira/browse/FLINK-13027 > Project: Flink > Issue Type: New Feature > Components: API / DataStream >Reporter: Ying Xu >Priority: Major > > When writing in bulk-encoded format such as Parquet, StreamingFileSink only > supports OnCheckpointRollingPolicy, which rolls file at checkpointing time. > > In many scenarios, it is beneficial that the sink can roll file upon certain > events, for example, when the file size reaches a limit. Such a rolling > policy can also potentially alleviate some of the side effects of > OnCheckpointRollingPolicy, e.g.,, most of the heavy liftings including file > uploading all happen at the checkpoint time. > Specifically, this Jira calls for a new rolling policy that rolls file: > # whenever a customized event happens, e.g., the file size reaches certain > limit. > # whenever a checkpoint happens. This is needed for providing exactly-once > guarantees when writing bulk-encoded files. > Users of this rolling policy need to be aware that the customized event and > the next checkpoint epoch may be close to each other, thus may yield a tiny > file per checkpoint at the worst. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-13027) StreamingFileSink bulk-encoded writer supports file rolling upon customized events
[ https://issues.apache.org/jira/browse/FLINK-13027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ying Xu reassigned FLINK-13027: --- Assignee: Ying Xu > StreamingFileSink bulk-encoded writer supports file rolling upon customized > events > -- > > Key: FLINK-13027 > URL: https://issues.apache.org/jira/browse/FLINK-13027 > Project: Flink > Issue Type: New Feature > Components: API / DataStream >Reporter: Ying Xu >Assignee: Ying Xu >Priority: Major > > When writing in bulk-encoded format such as Parquet, StreamingFileSink only > supports OnCheckpointRollingPolicy, which rolls file at checkpointing time. > > In many scenarios, it is beneficial that the sink can roll file upon certain > events, for example, when the file size reaches a limit. Such a rolling > policy can also potentially alleviate some of the side effects of > OnCheckpointRollingPolicy, e.g.,, most of the heavy liftings including file > uploading all happen at the checkpoint time. > Specifically, this Jira calls for a new rolling policy that rolls file: > # whenever a customized event happens, e.g., the file size reaches certain > limit. > # whenever a checkpoint happens. This is needed for providing exactly-once > guarantees when writing bulk-encoded files. > Users of this rolling policy need to be aware that the customized event and > the next checkpoint epoch may be close to each other, thus may yield a tiny > file per checkpoint at the worst. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13027) StreamingFileSink bulk-encoded writer supports file rolling upon customized events
Ying Xu created FLINK-13027: --- Summary: StreamingFileSink bulk-encoded writer supports file rolling upon customized events Key: FLINK-13027 URL: https://issues.apache.org/jira/browse/FLINK-13027 Project: Flink Issue Type: New Feature Components: API / DataStream Reporter: Ying Xu When writing in bulk-encoded format such as Parquet, StreamingFileSink only supports OnCheckpointRollingPolicy, which rolls file at checkpointing time. In many scenarios, it is beneficial that the sink can roll file upon certain events, for example, when the file size reaches a limit. Such a rolling policy can also potentially alleviate some of the side effects of OnCheckpointRollingPolicy, e.g.,, most of the heavy liftings including file uploading all happen at the checkpoint time. Specifically, this Jira calls for a new rolling policy that rolls file: # whenever a customized event happens, e.g., the file size reaches certain limit. # whenever a checkpoint happens. This is needed for providing exactly-once guarantees when writing bulk-encoded files. Users of this rolling policy need to be aware that the customized event and the next checkpoint epoch may be close to each other, thus may yield a tiny file per checkpoint at the worst. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16760284#comment-16760284 ] Ying Xu commented on FLINK-4582: Thanks [~tinder-dthomson] . Internally we use the *new ObjectMapper()* initialization but didn't observe similar issue. But we may run with an older Flink distribution. Yes please report this as a separate bug and perhaps attach full stack trace. If you already have a fix, feel free to post the PR as well. > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Ying Xu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference in resharding behaviours, so it is possible to build > on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB > Streams source. > I propose an API something like this: > {code} > DataStream dynamoItemsCdc = > FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) > {code} > The feature adds more connectivity to popular AWS services for Flink, and > combining what Flink has for exactly-once semantics, out-of-core state > backends, and queryable state with CDC can have very strong use cases. For > this feature there should only be an extra dependency to the AWS Java SDK for > DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16671216#comment-16671216 ] Ying Xu commented on FLINK-4582: [~tinder-dthomson] Posted the PR here: https://github.com/apache/flink/pull/6968 > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Ying Xu >Priority: Major > Labels: pull-request-available > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference in resharding behaviours, so it is possible to build > on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB > Streams source. > I propose an API something like this: > {code} > DataStream dynamoItemsCdc = > FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) > {code} > The feature adds more connectivity to popular AWS services for Flink, and > combining what Flink has for exactly-once semantics, out-of-core state > backends, and queryable state with CDC can have very strong use cases. For > this feature there should only be an extra dependency to the AWS Java SDK for > DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667569#comment-16667569 ] Ying Xu commented on FLINK-4582: Thanks [~tinder-dthomson] for the detailed comments. Yes that's exactly why I felt _efficient multi-stream_ support is somehow lacking :). Actually, we are running Flink 1.5.2 internally. For contributing to upstream, I'm currently adapting the patch to fit the master flink (1.7-SNAPSNOT). The main difference is flink 1.7 Kinesis connector uses the _listshards API_ to retrieve the shard list. For DynamoDB streams, we must use the _describeStreams API_ to retrieve such information since listshards is not supported. I am currently porting related logic around _describeStreams_ from the 1.5 flink to my patch. I shall be able to post a meaningful PR in 1-2 days. > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Ying Xu >Priority: Major > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference in resharding behaviours, so it is possible to build > on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB > Streams source. > I propose an API something like this: > {code} > DataStream dynamoItemsCdc = > FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) > {code} > The feature adds more connectivity to popular AWS services for Flink, and > combining what Flink has for exactly-once semantics, out-of-core state > backends, and queryable state with CDC can have very strong use cases. For > this feature there should only be an extra dependency to the AWS Java SDK for > DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16665527#comment-16665527 ] Ying Xu commented on FLINK-4582: Hi [~tinder-dthomson] thanks for raising this issue up. And sorry for the delay in responding to the original request. We actually implemented a version of the flink-dynamodbstreams connector on top of the existing flink-kinesis connector. The work is currently in production and was presented in a meetup event back in Sep. I wasn't able to get a chance to contribute back because of other work priorities – my bad! I looked at your PR. The use of _DynamodbProxy.getShardList()_ is interesting. We took a slightly different approach, which plugs in a dynamodbstreams-kinesis adapter object into KinesisProxy and makes it an equivalent _DynamodbProxy_ (approach mentioned in another thread titled *Consuming data from dynamoDB streams to flink*). We rely on the assumption that during re-sharing, one can retrieve all the new child shard Ids based on passing the last seen shardId. Although Dynamodbstreams do not officially claim this, we consistently observed similar behavior in production during resharding. Other benefits of directly embedding a dynamodbstreams-kinesis adapter is to allow *ONE* source (consumer) to consume from multiple data streams (which is important for our use cases), plus other error handling in the existing KinesisProxy. I agree that if the _DynamodbProxy_ provides _efficient multi-stream_ implementation, it is an interesting direction to look into. If you can wait a few days, I can adapt my PR on top of the OSS flink and post it by early next week. We can have more discussions at then. What do you think? Thank you very much! > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Ying Xu >Priority: Major > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference in resharding behaviours, so it is possible to build > on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB > Streams source. > I propose an API something like this: > {code} > DataStream dynamoItemsCdc = > FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) > {code} > The feature adds more connectivity to popular AWS services for Flink, and > combining what Flink has for exactly-once semantics, out-of-core state > backends, and queryable state with CDC can have very strong use cases. For > this feature there should only be an extra dependency to the AWS Java SDK for > DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10358) Flink kinesis connector could throw NPE during getRecords() call
[ https://issues.apache.org/jira/browse/FLINK-10358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16622739#comment-16622739 ] Ying Xu commented on FLINK-10358: - [PR 6708|https://github.com/apache/flink/pull/6708] is merged > Flink kinesis connector could throw NPE during getRecords() call > - > > Key: FLINK-10358 > URL: https://issues.apache.org/jira/browse/FLINK-10358 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Ying Xu >Assignee: Ying Xu >Priority: Major > Labels: pull-request-available > > When extending the flink kinesis connector to consume from a dynamodb stream, > it was found NPE could be thrown at > [here|https://github.com/apache/flink/blob/e3c98f05d3544d0165c2d97d2d00fcd295cef8c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L376] > . > This is because the [getRecords > API|https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html] > in dynamodb streams does not return the millisBehindLatest field and has it > set to null. Null check is probably needed here. > See FLINK-4582 for the context of building dynamodb streams connector on top > of the Kinesis connector. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10358) Flink kinesis connector could throw NPE during getRecords() call
[ https://issues.apache.org/jira/browse/FLINK-10358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ying Xu closed FLINK-10358. --- Resolution: Fixed > Flink kinesis connector could throw NPE during getRecords() call > - > > Key: FLINK-10358 > URL: https://issues.apache.org/jira/browse/FLINK-10358 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Ying Xu >Assignee: Ying Xu >Priority: Major > Labels: pull-request-available > > When extending the flink kinesis connector to consume from a dynamodb stream, > it was found NPE could be thrown at > [here|https://github.com/apache/flink/blob/e3c98f05d3544d0165c2d97d2d00fcd295cef8c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L376] > . > This is because the [getRecords > API|https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html] > in dynamodb streams does not return the millisBehindLatest field and has it > set to null. Null check is probably needed here. > See FLINK-4582 for the context of building dynamodb streams connector on top > of the Kinesis connector. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10358) Flink kinesis connector could throw NPE during getRecords() call
[ https://issues.apache.org/jira/browse/FLINK-10358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ying Xu reassigned FLINK-10358: --- Assignee: Ying Xu > Flink kinesis connector could throw NPE during getRecords() call > - > > Key: FLINK-10358 > URL: https://issues.apache.org/jira/browse/FLINK-10358 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Ying Xu >Assignee: Ying Xu >Priority: Major > Labels: pull-request-available > > When extending the flink kinesis connector to consume from a dynamodb stream, > it was found NPE could be thrown at > [here|https://github.com/apache/flink/blob/e3c98f05d3544d0165c2d97d2d00fcd295cef8c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L376] > . > This is because the [getRecords > API|https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html] > in dynamodb streams does not return the millisBehindLatest field and has it > set to null. Null check is probably needed here. > See FLINK-4582 for the context of building dynamodb streams connector on top > of the Kinesis connector. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10358) Flink kinesis connector could throw NPE during getRecords() call
[ https://issues.apache.org/jira/browse/FLINK-10358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ying Xu updated FLINK-10358: Description: When extending the flink kinesis connector to consume from a dynamodb stream, it was found NPE could be thrown at [here|https://github.com/apache/flink/blob/e3c98f05d3544d0165c2d97d2d00fcd295cef8c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L376] . This is because the [getRecords API|https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html] in dynamodb streams does not return the millisBehindLatest field and has it set to null. Null check is probably needed here. See FLINK-4582 for the context of building dynamodb streams connector on top of the Kinesis connector. was: When extending the flink kinesis connector to consume from a dynamodb stream, it was found NPE could be thrown at [here|https://github.com/apache/flink/blob/e3c98f05d3544d0165c2d97d2d00fcd295cef8c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L376] . This is because the [getRecords API|https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html] in dynamodb streams does not return the millisBehindLatest field and has it set to null. Null check is probably needed here. > Flink kinesis connector could throw NPE during getRecords() call > - > > Key: FLINK-10358 > URL: https://issues.apache.org/jira/browse/FLINK-10358 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Ying Xu >Priority: Major > Labels: pull-request-available > > When extending the flink kinesis connector to consume from a dynamodb stream, > it was found NPE could be thrown at > [here|https://github.com/apache/flink/blob/e3c98f05d3544d0165c2d97d2d00fcd295cef8c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L376] > . > This is because the [getRecords > API|https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html] > in dynamodb streams does not return the millisBehindLatest field and has it > set to null. Null check is probably needed here. > See FLINK-4582 for the context of building dynamodb streams connector on top > of the Kinesis connector. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10358) Flink kinesis connector could throw NPE during getRecords() call
[ https://issues.apache.org/jira/browse/FLINK-10358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ying Xu updated FLINK-10358: Description: When extending the flink kinesis connector to consume from a dynamodb stream, it was found NPE could be thrown at [here|https://github.com/apache/flink/blob/e3c98f05d3544d0165c2d97d2d00fcd295cef8c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L376] . This is because the [getRecords API|https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html] in dynamodb streams does not return the millisBehindLatest field and has it set to null. Null check is probably needed here. was: When extending the flink kinesis connector to consume from a dynamodb stream, it was found NPE could be thrown at [here|[https://github.com/apache/flink/blob/e3c98f05d3544d0165c2d97d2d00fcd295cef8c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L376]] . This is because the [getRecords API|https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html] in dynamodb streams does not return the millisBehindLatest field and has it set to null. Null check is probably needed here. > Flink kinesis connector could throw NPE during getRecords() call > - > > Key: FLINK-10358 > URL: https://issues.apache.org/jira/browse/FLINK-10358 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Ying Xu >Priority: Major > Labels: pull-request-available > > When extending the flink kinesis connector to consume from a dynamodb stream, > it was found NPE could be thrown at > [here|https://github.com/apache/flink/blob/e3c98f05d3544d0165c2d97d2d00fcd295cef8c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L376] > . > This is because the [getRecords > API|https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html] > in dynamodb streams does not return the millisBehindLatest field and has it > set to null. Null check is probably needed here. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10358) Flink kinesis connector could throw NPE during getRecords() call
[ https://issues.apache.org/jira/browse/FLINK-10358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ying Xu updated FLINK-10358: Description: When extending the flink kinesis connector to consume from a dynamodb stream, it was found NPE could be thrown at [here|[https://github.com/apache/flink/blob/e3c98f05d3544d0165c2d97d2d00fcd295cef8c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L376]] . This is because the [getRecords API|https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html] in dynamodb streams does not return the millisBehindLatest field and has it set to null. Null check is probably needed here. was: When extending the flink kinesis connector to consume from a dynamodb stream, it was found NPE could be thrown at [this line|[https://github.com/apache/flink/blob/e3c98f05d3544d0165c2d97d2d00fcd295cef8c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L376]] . This is because the [getRecords API|https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html] in dynamodb streams does not return the millisBehindLatest field and has it set to null. Null check is probably needed here. > Flink kinesis connector could throw NPE during getRecords() call > - > > Key: FLINK-10358 > URL: https://issues.apache.org/jira/browse/FLINK-10358 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Ying Xu >Priority: Major > Labels: pull-request-available > > When extending the flink kinesis connector to consume from a dynamodb stream, > it was found NPE could be thrown at > [here|[https://github.com/apache/flink/blob/e3c98f05d3544d0165c2d97d2d00fcd295cef8c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L376]] > . > This is because the [getRecords > API|https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html] > in dynamodb streams does not return the millisBehindLatest field and has it > set to null. Null check is probably needed here. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10358) Flink kinesis connector could throw NPE during getRecords() call
[ https://issues.apache.org/jira/browse/FLINK-10358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ying Xu updated FLINK-10358: Description: When extending the flink kinesis connector to consume from a dynamodb stream, it was found NPE could be thrown at [this line|[https://github.com/apache/flink/blob/e3c98f05d3544d0165c2d97d2d00fcd295cef8c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L376]] . This is because the [getRecords API|https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html] in dynamodb streams does not return the millisBehindLatest field and has it set to null. Null check is probably needed here. was: When extending the flink kinesis connector to consume from a dynamodb stream, it was found NPE could be thrown at [this line|[https://github.com/apache/flink/blob/e3c98f05d3544d0165c2d97d2d00fcd295cef8c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L376|https://github.com/apache/flink/blob/e3c98f05d3544d0165c2d97d2d00fcd295cef8c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L376].]] . This is because the [getRecords API|https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html] in dynamodb streams does not return the millisBehindLatest field and has it set to null. Null check is probably needed here. > Flink kinesis connector could throw NPE during getRecords() call > - > > Key: FLINK-10358 > URL: https://issues.apache.org/jira/browse/FLINK-10358 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Ying Xu >Priority: Major > Labels: pull-request-available > > When extending the flink kinesis connector to consume from a dynamodb stream, > it was found NPE could be thrown at [this > line|[https://github.com/apache/flink/blob/e3c98f05d3544d0165c2d97d2d00fcd295cef8c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L376]] > . > This is because the [getRecords > API|https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html] > in dynamodb streams does not return the millisBehindLatest field and has it > set to null. Null check is probably needed here. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10358) Flink kinesis connector could throw NPE during getRecords() call
Ying Xu created FLINK-10358: --- Summary: Flink kinesis connector could throw NPE during getRecords() call Key: FLINK-10358 URL: https://issues.apache.org/jira/browse/FLINK-10358 Project: Flink Issue Type: Bug Components: Kinesis Connector Reporter: Ying Xu When extending the flink kinesis connector to consume from a dynamodb stream, it was found NPE could be thrown at [this line|[https://github.com/apache/flink/blob/e3c98f05d3544d0165c2d97d2d00fcd295cef8c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L376|https://github.com/apache/flink/blob/e3c98f05d3544d0165c2d97d2d00fcd295cef8c8/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L376].]] . This is because the [getRecords API|https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html] in dynamodb streams does not return the millisBehindLatest field and has it set to null. Null check is probably needed here. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16560327#comment-16560327 ] Ying Xu commented on FLINK-4582: [~tzulitai] Just an update, we are very close to have a working version. > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Priority: Major > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference in resharding behaviours, so it is possible to build > on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB > Streams source. > I propose an API something like this: > {code} > DataStream dynamoItemsCdc = > FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) > {code} > The feature adds more connectivity to popular AWS services for Flink, and > combining what Flink has for exactly-once semantics, out-of-core state > backends, and queryable state with CDC can have very strong use cases. For > this feature there should only be an extra dependency to the AWS Java SDK for > DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16525650#comment-16525650 ] Ying Xu commented on FLINK-4582: Hi: [~tzulitai] [~mingdaoy] I'm following up on this JIRA as we currently have a production use case which requires injecting the DynamoDB changelogs into Kafka. Interested in contributing to related efforts as well. I have raised a request on the dev mailing list ([raw message|https://mail-archives.apache.org/mod_mbox/flink-dev/201806.mbox/raw/%3CCAJ5M44_FC8u713SWHCZx02FtEfyM8RpDF%2BeTNS9W%3DTC4JkVicQ%40mail.gmail.com%3E]) Thanks. > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Mingdao Yang >Priority: Major > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference in resharding behaviours, so it is possible to build > on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB > Streams source. > I propose an API something like this: > {code} > DataStream dynamoItemsCdc = > FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) > {code} > The feature adds more connectivity to popular AWS services for Flink, and > combining what Flink has for exactly-once semantics, out-of-core state > backends, and queryable state with CDC can have very strong use cases. For > this feature there should only be an extra dependency to the AWS Java SDK for > DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)