[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16760284#comment-16760284 ] Ying Xu commented on FLINK-4582: Thanks [~tinder-dthomson] . Internally we use the *new ObjectMapper()* initialization but didn't observe similar issue. But we may run with an older Flink distribution. Yes please report this as a separate bug and perhaps attach full stack trace. If you already have a fix, feel free to post the PR as well. > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Ying Xu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference in resharding behaviours, so it is possible to build > on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB > Streams source. > I propose an API something like this: > {code} > DataStream dynamoItemsCdc = > FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) > {code} > The feature adds more connectivity to popular AWS services for Flink, and > combining what Flink has for exactly-once semantics, out-of-core state > backends, and queryable state with CDC can have very strong use cases. For > this feature there should only be an extra dependency to the AWS Java SDK for > DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16760263#comment-16760263 ] Thomas Weise commented on FLINK-4582: - [~tinder-dthomson] Thanks for the report and yes, please track it as a separate issue and mark the fix version as 1.8 so that we get it resolved as part of 1.8. > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Ying Xu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference in resharding behaviours, so it is possible to build > on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB > Streams source. > I propose an API something like this: > {code} > DataStream dynamoItemsCdc = > FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) > {code} > The feature adds more connectivity to popular AWS services for Flink, and > combining what Flink has for exactly-once semantics, out-of-core state > backends, and queryable state with CDC can have very strong use cases. For > this feature there should only be an extra dependency to the AWS Java SDK for > DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16760250#comment-16760250 ] Devin Thomson commented on FLINK-4582: -- [~yxu-lyft] circling back here, looks like this got merged in, congrats! I've been preemptively cutting over from my implementation to this one and noticed one (blocking) bug: {code:java} j.l.IllegalArgumentException: Conflicting setter definitions for property \"eventName\": org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.Record#setEventName(1 params) vs org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.Record#setEventName(1 params) at c.f.j.d.i.POJOPropertyBuilder.getSetter(POJOPropertyBuilder.java:300) at c.f.j.d.d.BeanDeserializerFactory.filterBeanProps(BeanDeserializerFactory.java:619) at c.f.j.d.d.BeanDeserializerFactory.addBeanProps(BeanDeserializerFactory.java:515) at c.f.j.d.d.BeanDeserializerFactory.buildBeanDeserializer(BeanDeserializerFactory.java:256) at c.f.j.d.d.BeanDeserializerFactory.createBeanDeserializer(BeanDeserializerFactory.java:169) at c.f.j.d.d.DeserializerCache._createDeserializer2(DeserializerCache.java:403) at c.f.j.d.d.DeserializerCache._createDeserializer(DeserializerCache.java:352) at c.f.j.d.d.DeserializerCache._createAndCache2(DeserializerCache.java:264) ... 15 common frames omitted Wrapped by: c.f.j.d.JsonMappingException: Conflicting setter definitions for property \"eventName\": org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.Record#setEventName(1 params... {code} It looks like the root cause is that DynamoDBStreamsSchema.java defines the object mapper as follows: {code:java} private static final ObjectMapper MAPPER = new ObjectMapper(); {code} When it should be using the appropriate mix-ins offered by the dynamodb stream adapter library: {code:java} private static final ObjectMapper MAPPER = new RecordObjectMapper(); {code} This appears to resolve the issue, I tested by using my own deserializer implementation. Not sure if it makes sense to track this as a separate issue or not since this is still a 1.8-SNAPSHOT feature. Let me know if you have any questions! - Devin > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Ying Xu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference in resharding behaviours, so it is possible to build > on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB > Streams source. > I propose an API something like this: > {code} > DataStream dynamoItemsCdc = > FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) > {code} > The feature adds more connectivity to popular AWS services for Flink, and > combining what Flink has for exactly-once semantics, out-of-core state > backends, and queryable state with CDC can have very strong use cases. For > this feature there should only be an extra dependency to the AWS Java SDK for > DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16709203#comment-16709203 ] ASF GitHub Bot commented on FLINK-4582: --- tweise commented on a change in pull request #6968: [FLINK-4582] [kinesis] Consuming data from DynamoDB streams to flink URL: https://github.com/apache/flink/pull/6968#discussion_r238817945 ## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java ## @@ -143,6 +143,34 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { /** The interval after which to consider a shard idle for purposes of watermark generation. */ public static final String SHARD_IDLE_INTERVAL_MILLIS = "flink.shard.idle.interval"; + /** +* The base backoff time between each describeStream attempt. +* Different tag name to distinguish from "flink.stream.describe.backoff.base" +* since the latter is deprecated. +*/ + public static final String DYNAMODB_STREAMS_DESCRIBE_BACKOFF_BASE = Review comment: It would be good to modify the method to map/replicate the keys but not remove them (since you need them for dynamo). Add a comment that this is for backward compatibility for the regular proxy that is now using listShards instead of describeShards. We can remove the mapping in later release. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Ying Xu >Priority: Major > Labels: pull-request-available > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference in resharding behaviours, so it is possible to build > on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB > Streams source. > I propose an API something like this: > {code} > DataStream dynamoItemsCdc = > FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) > {code} > The feature adds more connectivity to popular AWS services for Flink, and > combining what Flink has for exactly-once semantics, out-of-core state > backends, and queryable state with CDC can have very strong use cases. For > this feature there should only be an extra dependency to the AWS Java SDK for > DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707635#comment-16707635 ] ASF GitHub Bot commented on FLINK-4582: --- yxu-valleytider commented on a change in pull request #6968: [FLINK-4582] [kinesis] Consuming data from DynamoDB streams to flink URL: https://github.com/apache/flink/pull/6968#discussion_r238386976 ## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java ## @@ -143,6 +143,34 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { /** The interval after which to consider a shard idle for purposes of watermark generation. */ public static final String SHARD_IDLE_INTERVAL_MILLIS = "flink.shard.idle.interval"; + /** +* The base backoff time between each describeStream attempt. +* Different tag name to distinguish from "flink.stream.describe.backoff.base" +* since the latter is deprecated. +*/ + public static final String DYNAMODB_STREAMS_DESCRIBE_BACKOFF_BASE = Review comment: @tweise are you ok with removing this function `KinesisProxy::replaceDeprecatedConsumerKeys` as well ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Ying Xu >Priority: Major > Labels: pull-request-available > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference in resharding behaviours, so it is possible to build > on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB > Streams source. > I propose an API something like this: > {code} > DataStream dynamoItemsCdc = > FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) > {code} > The feature adds more connectivity to popular AWS services for Flink, and > combining what Flink has for exactly-once semantics, out-of-core state > backends, and queryable state with CDC can have very strong use cases. For > this feature there should only be an extra dependency to the AWS Java SDK for > DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698253#comment-16698253 ] ASF GitHub Bot commented on FLINK-4582: --- tweise commented on a change in pull request #6968: [FLINK-4582] [kinesis] Consuming data from DynamoDB streams to flink URL: https://github.com/apache/flink/pull/6968#discussion_r236085780 ## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java ## @@ -143,6 +143,34 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { /** The interval after which to consider a shard idle for purposes of watermark generation. */ public static final String SHARD_IDLE_INTERVAL_MILLIS = "flink.shard.idle.interval"; + /** +* The base backoff time between each describeStream attempt. +* Different tag name to distinguish from "flink.stream.describe.backoff.base" +* since the latter is deprecated. +*/ + public static final String DYNAMODB_STREAMS_DESCRIBE_BACKOFF_BASE = Review comment: Remove these and remove the `@Deprecated` annotations from the prior properties. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Ying Xu >Priority: Major > Labels: pull-request-available > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference in resharding behaviours, so it is possible to build > on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB > Streams source. > I propose an API something like this: > {code} > DataStream dynamoItemsCdc = > FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) > {code} > The feature adds more connectivity to popular AWS services for Flink, and > combining what Flink has for exactly-once semantics, out-of-core state > backends, and queryable state with CDC can have very strong use cases. For > this feature there should only be an extra dependency to the AWS Java SDK for > DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698254#comment-16698254 ] ASF GitHub Bot commented on FLINK-4582: --- tweise commented on a change in pull request #6968: [FLINK-4582] [kinesis] Consuming data from DynamoDB streams to flink URL: https://github.com/apache/flink/pull/6968#discussion_r236085419 ## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkDynamodbStreamsConsumer.java ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kinesis.internals.DynamodbStreamsDataFetcher; +import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; +import org.apache.flink.streaming.connectors.kinesis.serialization.DynamodbStreamsSchema; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; + +import com.amazonaws.services.dynamodbv2.model.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +/** + * Consume events from the dyanmodbdb streams. + * + * @param the type of data emitted + */ +public class FlinkDynamodbStreamsConsumer extends FlinkKinesisConsumer { + private static final Logger LOG = LoggerFactory.getLogger(FlinkDynamodbStreamsConsumer.class); + + /** +* Constructor of FlinkDynamodbStreamsConsumer. +* +* @param stream stream to consume +* @param deserializer deserialization schema +* @param config config properties +*/ + public FlinkDynamodbStreamsConsumer( + String stream, + DeserializationSchema deserializer, + Properties config) { + super(stream, deserializer, config); + } + + /** +* Constructor of FlinkDynamodbStreamConsumer. +* +* @param streams list of streams to consume +* @param deserializer deserialization schema +* @param config config properties +*/ + public FlinkDynamodbStreamsConsumer( + List streams, + KinesisDeserializationSchema deserializer, + Properties config) { + super(streams, deserializer, config); + } + + public static FlinkDynamodbStreamsConsumer create(String stream, Review comment: Please remove these static create methods, they don't provide any benefit since the constructors are public (which is consistent with the base class). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Ying Xu >Priority: Major > Labels: pull-request-available > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698252#comment-16698252 ] ASF GitHub Bot commented on FLINK-4582: --- tweise commented on a change in pull request #6968: [FLINK-4582] [kinesis] Consuming data from DynamoDB streams to flink URL: https://github.com/apache/flink/pull/6968#discussion_r236085962 ## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamodbStreamsDataFetcher.java ## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.internals; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner; +import org.apache.flink.streaming.connectors.kinesis.metrics.ShardMetricsReporter; +import org.apache.flink.streaming.connectors.kinesis.model.DynamodbStreamsShardHandle; +import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle; +import org.apache.flink.streaming.connectors.kinesis.proxy.DynamodbStreamsProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_DYNAMODB_STREAMS_SHARDID_FORMAT_CHECK; +import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DYNAMODB_STREAMS_SHARDID_FORMAT_CHECK; + +/** + * Dynamodb streams data fetcher. + * @param type of fetched data. + */ +public class DynamodbStreamsDataFetcher extends KinesisDataFetcher { + private boolean shardIdFormatCheck = false; + + /** +* Constructor. +* +* @param streams list of streams to fetch data +* @param sourceContext source context +* @param runtimeContext runtime context +* @param configProps config properties +* @param deserializationSchema deserialization schema +* @param shardAssigner shard assigner +*/ + public DynamodbStreamsDataFetcher(List streams, + SourceFunction.SourceContext sourceContext, + RuntimeContext runtimeContext, + Properties configProps, + KinesisDeserializationSchema deserializationSchema, + KinesisShardAssigner shardAssigner) { + + super(streams, + sourceContext, + sourceContext.getCheckpointLock(), + runtimeContext, + configProps, + deserializationSchema, + shardAssigner, + null, + new AtomicReference<>(), + new ArrayList<>(), + createInitialSubscribedStreamsToLastDiscoveredShardsState(streams), + // use DynamodbStreamsProxy + DynamodbStreamsProxy::create); + + shardIdFormatCheck = Boolean.valueOf(configProps.getProperty( + DYNAMODB_STREAMS_SHARDID_FORMAT_CHECK, + DEFAULT_DYNAMODB_STREAMS_SHARDID_FORMAT_CHECK)); + } + + /** +* Updates the last discovered shard of a subscribed stream; only updates if the update is valid. +*/ + @Override + public void advanceLastDiscoveredShardOfStream(String stream, String shardId) { Review comment: Rather than duplicating complete logic from the base class, can we just extract what is unique to DynamoDB? That might also eliminate the need to expose `subscribedStreamsToLastDiscoveredShardIds`? This is an automated message from the Apache Git Service.
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698250#comment-16698250 ] ASF GitHub Bot commented on FLINK-4582: --- tweise commented on a change in pull request #6968: [FLINK-4582] [kinesis] Consuming data from DynamoDB streams to flink URL: https://github.com/apache/flink/pull/6968#discussion_r236085574 ## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/DynamodbStreamsProxy.java ## @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.proxy; + +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.ClientConfigurationFactory; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient; +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.model.DescribeStreamRequest; +import com.amazonaws.services.kinesis.model.DescribeStreamResult; +import com.amazonaws.services.kinesis.model.LimitExceededException; +import com.amazonaws.services.kinesis.model.ResourceNotFoundException; +import com.amazonaws.services.kinesis.model.Shard; +import com.amazonaws.services.kinesis.model.StreamStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_ENDPOINT; +import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_REGION; +import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_DYNAMODB_STREAMS_DESCRIBE_BACKOFF_BASE; +import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_DYNAMODB_STREAMS_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT; +import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_DYNAMODB_STREAMS_DESCRIBE_BACKOFF_MAX; +import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DYNAMODB_STREAMS_DESCRIBE_BACKOFF_BASE; +import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DYNAMODB_STREAMS_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT; +import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DYNAMODB_STREAMS_DESCRIBE_BACKOFF_MAX; +import static org.apache.flink.streaming.connectors.kinesis.util.AWSUtil.getCredentialsProvider; +import static org.apache.flink.streaming.connectors.kinesis.util.AWSUtil.setAwsClientConfigProperties; + +/** + * DynamoDB streams proxy: interface interacting with the DynamoDB streams. + */ +public class DynamodbStreamsProxy extends KinesisProxy { + private static final Logger LOG = LoggerFactory.getLogger(DynamodbStreamsProxy.class); + + /** Used for formatting Flink-specific user agent string when creating Kinesis client. */ + private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) DynamoDB Streams Connector"; + + // Backoff millis for the describe stream operation. + private final long describeStreamBaseBackoffMillis; + // Maximum backoff millis for the describe stream operation. + private final long describeStreamMaxBackoffMillis; + // Exponential backoff power constant for the describe stream operation. + private final double describeStreamExpConstant; + + protected DynamodbStreamsProxy(Properties configProps) { + super(configProps); + + // parse properties + describeStreamBaseBackoffMillis = Long.valueOf( +
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698251#comment-16698251 ] ASF GitHub Bot commented on FLINK-4582: --- tweise commented on a change in pull request #6968: [FLINK-4582] [kinesis] Consuming data from DynamoDB streams to flink URL: https://github.com/apache/flink/pull/6968#discussion_r236085609 ## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java ## @@ -73,7 +73,7 @@ private static final Logger LOG = LoggerFactory.getLogger(KinesisProxy.class); /** The actual Kinesis client from the AWS SDK that we will be using to make calls. */ - private final AmazonKinesis kinesisClient; + protected final AmazonKinesis kinesisClient; Review comment: If `describeStream` moves to `KinesisProxy`, this change becomes unnecessary. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Ying Xu >Priority: Major > Labels: pull-request-available > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference in resharding behaviours, so it is possible to build > on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB > Streams source. > I propose an API something like this: > {code} > DataStream dynamoItemsCdc = > FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) > {code} > The feature adds more connectivity to popular AWS services for Flink, and > combining what Flink has for exactly-once semantics, out-of-core state > backends, and queryable state with CDC can have very strong use cases. For > this feature there should only be an extra dependency to the AWS Java SDK for > DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16696228#comment-16696228 ] ASF GitHub Bot commented on FLINK-4582: --- yxu-valleytider commented on issue #6968: [FLINK-4582] [kinesis] Consuming data from DynamoDB streams to flink URL: https://github.com/apache/flink/pull/6968#issuecomment-44240 Adjusted package name based on comments. PTAL @tweise @tzulitai This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Ying Xu >Priority: Major > Labels: pull-request-available > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference in resharding behaviours, so it is possible to build > on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB > Streams source. > I propose an API something like this: > {code} > DataStream dynamoItemsCdc = > FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) > {code} > The feature adds more connectivity to popular AWS services for Flink, and > combining what Flink has for exactly-once semantics, out-of-core state > backends, and queryable state with CDC can have very strong use cases. For > this feature there should only be an extra dependency to the AWS Java SDK for > DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16688872#comment-16688872 ] ASF GitHub Bot commented on FLINK-4582: --- tweise commented on a change in pull request #6968: [FLINK-4582] [kinesis] Consuming data from DynamoDB streams to flink URL: https://github.com/apache/flink/pull/6968#discussion_r234063079 ## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/dynamodbstreams/FlinkDynamodbStreamsConsumer.java ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.dynamodbstreams; Review comment: I don't think that will work: We had already determined in our internal work that we need a single shaded AWS dependency. I don't think it's desirable either, because it is just a slight variation of the Kinesis consumer that does not deserve a separate module. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Ying Xu >Priority: Major > Labels: pull-request-available > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference in resharding behaviours, so it is possible to build > on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB > Streams source. > I propose an API something like this: > {code} > DataStream dynamoItemsCdc = > FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) > {code} > The feature adds more connectivity to popular AWS services for Flink, and > combining what Flink has for exactly-once semantics, out-of-core state > backends, and queryable state with CDC can have very strong use cases. For > this feature there should only be an extra dependency to the AWS Java SDK for > DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16688865#comment-16688865 ] ASF GitHub Bot commented on FLINK-4582: --- yxu-valleytider commented on a change in pull request #6968: [FLINK-4582] [kinesis] Consuming data from DynamoDB streams to flink URL: https://github.com/apache/flink/pull/6968#discussion_r234060924 ## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/dynamodbstreams/FlinkDynamodbStreamsConsumer.java ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.dynamodbstreams; Review comment: In that case I don't mind putting the files into a separate directory, e.g., `flink-connector-dynamodbstreams`. WDYT ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Ying Xu >Priority: Major > Labels: pull-request-available > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference in resharding behaviours, so it is possible to build > on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB > Streams source. > I propose an API something like this: > {code} > DataStream dynamoItemsCdc = > FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) > {code} > The feature adds more connectivity to popular AWS services for Flink, and > combining what Flink has for exactly-once semantics, out-of-core state > backends, and queryable state with CDC can have very strong use cases. For > this feature there should only be an extra dependency to the AWS Java SDK for > DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16678300#comment-16678300 ] ASF GitHub Bot commented on FLINK-4582: --- tweise commented on a change in pull request #6968: [FLINK-4582] [kinesis] Consuming data from DynamoDB streams to flink URL: https://github.com/apache/flink/pull/6968#discussion_r231527728 ## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/dynamodbstreams/FlinkDynamodbStreamsConsumer.java ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.dynamodbstreams; Review comment: These are just packages with the module `flink-connector-kinesis` and the package prefix should reflect that. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Ying Xu >Priority: Major > Labels: pull-request-available > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference in resharding behaviours, so it is possible to build > on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB > Streams source. > I propose an API something like this: > {code} > DataStream dynamoItemsCdc = > FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) > {code} > The feature adds more connectivity to popular AWS services for Flink, and > combining what Flink has for exactly-once semantics, out-of-core state > backends, and queryable state with CDC can have very strong use cases. For > this feature there should only be an extra dependency to the AWS Java SDK for > DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16671216#comment-16671216 ] Ying Xu commented on FLINK-4582: [~tinder-dthomson] Posted the PR here: https://github.com/apache/flink/pull/6968 > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Ying Xu >Priority: Major > Labels: pull-request-available > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference in resharding behaviours, so it is possible to build > on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB > Streams source. > I propose an API something like this: > {code} > DataStream dynamoItemsCdc = > FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) > {code} > The feature adds more connectivity to popular AWS services for Flink, and > combining what Flink has for exactly-once semantics, out-of-core state > backends, and queryable state with CDC can have very strong use cases. For > this feature there should only be an extra dependency to the AWS Java SDK for > DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16671214#comment-16671214 ] ASF GitHub Bot commented on FLINK-4582: --- yxu-valleytider commented on issue #6968: [FLINK-4582] [kinesis] Consuming data from DynamoDB streams to flink URL: https://github.com/apache/flink/pull/6968#issuecomment-434959900 PTAL @tzulitai This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Ying Xu >Priority: Major > Labels: pull-request-available > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference in resharding behaviours, so it is possible to build > on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB > Streams source. > I propose an API something like this: > {code} > DataStream dynamoItemsCdc = > FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) > {code} > The feature adds more connectivity to popular AWS services for Flink, and > combining what Flink has for exactly-once semantics, out-of-core state > backends, and queryable state with CDC can have very strong use cases. For > this feature there should only be an extra dependency to the AWS Java SDK for > DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16671195#comment-16671195 ] ASF GitHub Bot commented on FLINK-4582: --- yxu-valleytider commented on a change in pull request #6968: [FLINK-4582] [kinesis] Consuming data from DynamoDB streams to flink URL: https://github.com/apache/flink/pull/6968#discussion_r229956874 ## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java ## @@ -73,7 +73,7 @@ private static final Logger LOG = LoggerFactory.getLogger(KinesisProxy.class); /** The actual Kinesis client from the AWS SDK that we will be using to make calls. */ - private final AmazonKinesis kinesisClient; + protected final AmazonKinesis kinesisClient; Review comment: It is used inside `DynamodbStreamsProxy` to [execute the `describeStream` call](https://github.com/apache/flink/blob/918eb652a6fa865652b464e742c5f5b6c08c5be6/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/dynamodbstreams/proxy/DynamodbStreamsProxy.java#L197). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Ying Xu >Priority: Major > Labels: pull-request-available > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference in resharding behaviours, so it is possible to build > on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB > Streams source. > I propose an API something like this: > {code} > DataStream dynamoItemsCdc = > FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) > {code} > The feature adds more connectivity to popular AWS services for Flink, and > combining what Flink has for exactly-once semantics, out-of-core state > backends, and queryable state with CDC can have very strong use cases. For > this feature there should only be an extra dependency to the AWS Java SDK for > DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16671196#comment-16671196 ] ASF GitHub Bot commented on FLINK-4582: --- yxu-valleytider commented on a change in pull request #6968: [FLINK-4582] [kinesis] Consuming data from DynamoDB streams to flink URL: https://github.com/apache/flink/pull/6968#discussion_r229956951 ## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/dynamodbstreams/FlinkDynamodbStreamsConsumer.java ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.dynamodbstreams; Review comment: Hmm, both the `dynamodbstreams` and `kinesis` modules are under the parent module `org.apache.flink.streaming.connectors`. This is also consistent with the directory structure. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Ying Xu >Priority: Major > Labels: pull-request-available > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference in resharding behaviours, so it is possible to build > on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB > Streams source. > I propose an API something like this: > {code} > DataStream dynamoItemsCdc = > FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) > {code} > The feature adds more connectivity to popular AWS services for Flink, and > combining what Flink has for exactly-once semantics, out-of-core state > backends, and queryable state with CDC can have very strong use cases. For > this feature there should only be an extra dependency to the AWS Java SDK for > DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16671197#comment-16671197 ] ASF GitHub Bot commented on FLINK-4582: --- yxu-valleytider commented on a change in pull request #6968: [FLINK-4582] [kinesis] Consuming data from DynamoDB streams to flink URL: https://github.com/apache/flink/pull/6968#discussion_r229958032 ## File path: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/dynamodbstreams/model/DynamodbStreamsShardHandleTest.java ## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.dynamodbstreams.model; + +import org.junit.Test; + +import static org.apache.flink.streaming.connectors.dynamodbstreams.model.DynamodbStreamsShardHandle.SHARDID_PREFIX; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Shard handle unit tests. + */ +public class DynamodbStreamsShardHandleTest { + @Test + public void testIsValidShardId() { + // normal form + String shardId = "shardId-0001536805703746-69688cb1"; + assertEquals(true, DynamodbStreamsShardHandle.isValidShardId(shardId)); + + // short form + shardId = "shardId-0001536805703746"; + assertEquals(true, DynamodbStreamsShardHandle.isValidShardId(shardId)); + + // long form + shardId = "shardId-0001536805703746-69688cb1aljkwerijfl8228sl12a123akfla"; + assertEquals(true, DynamodbStreamsShardHandle.isValidShardId(shardId)); + + // invalid with wrong prefix + shardId = "sId-0001536805703746-69688cb1"; + assertEquals(false, DynamodbStreamsShardHandle.isValidShardId(shardId)); + + // invalid with non-digits + shardId = "shardId-000153680570aabb-69688cb1"; + assertEquals(false, DynamodbStreamsShardHandle.isValidShardId(shardId)); + + // invalid with shardId too long + shardId = "shardId-0001536805703746-69688cb1aljkwerijfl8228sl12a123akfla"; + assertEquals(false, DynamodbStreamsShardHandle.isValidShardId(shardId)); + } + + @Test + public void testCompareShardId() { + final int numShardIds = 10; + final int shardIdDigitLen = 20; + final String zeros = ""; // twenty '0' chars + String shardIdValid = "shardId-0001536805703746-69688cb1"; + String shardIdInvalid = "shardId-000153680570aabb-69688cb1"; + + assertEquals(0, DynamodbStreamsShardHandle.compareShardIds(shardIdValid, shardIdValid)); + + // comparison of invalid shardIds should yield exception + try { + DynamodbStreamsShardHandle.compareShardIds(shardIdValid, shardIdInvalid); + fail("invalid shard Id" + shardIdInvalid + " should trigger exception"); + } catch (IllegalArgumentException e) { + // ignore + } + try { + DynamodbStreamsShardHandle.compareShardIds(shardIdInvalid, shardIdValid); + fail("invalid shard Id" + shardIdInvalid + " should trigger exception"); + } catch (IllegalArgumentException e) { + // ignore Review comment: yes `// expected` clarifies more. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL:
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16670101#comment-16670101 ] ASF GitHub Bot commented on FLINK-4582: --- tweise commented on a change in pull request #6968: [FLINK-4582] Consuming data from DynamoDB streams to flink URL: https://github.com/apache/flink/pull/6968#discussion_r229688903 ## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java ## @@ -73,7 +73,7 @@ private static final Logger LOG = LoggerFactory.getLogger(KinesisProxy.class); /** The actual Kinesis client from the AWS SDK that we will be using to make calls. */ - private final AmazonKinesis kinesisClient; + protected final AmazonKinesis kinesisClient; Review comment: why this modifier change? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Ying Xu >Priority: Major > Labels: pull-request-available > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference in resharding behaviours, so it is possible to build > on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB > Streams source. > I propose an API something like this: > {code} > DataStream dynamoItemsCdc = > FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) > {code} > The feature adds more connectivity to popular AWS services for Flink, and > combining what Flink has for exactly-once semantics, out-of-core state > backends, and queryable state with CDC can have very strong use cases. For > this feature there should only be an extra dependency to the AWS Java SDK for > DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16670097#comment-16670097 ] ASF GitHub Bot commented on FLINK-4582: --- tweise commented on a change in pull request #6968: [FLINK-4582] Consuming data from DynamoDB streams to flink URL: https://github.com/apache/flink/pull/6968#discussion_r229688269 ## File path: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/dynamodbstreams/model/DynamodbStreamsShardHandleTest.java ## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.dynamodbstreams.model; + +import org.junit.Test; + +import static org.apache.flink.streaming.connectors.dynamodbstreams.model.DynamodbStreamsShardHandle.SHARDID_PREFIX; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Shard handle unit tests. + */ +public class DynamodbStreamsShardHandleTest { + @Test + public void testIsValidShardId() { + // normal form + String shardId = "shardId-0001536805703746-69688cb1"; + assertEquals(true, DynamodbStreamsShardHandle.isValidShardId(shardId)); + + // short form + shardId = "shardId-0001536805703746"; + assertEquals(true, DynamodbStreamsShardHandle.isValidShardId(shardId)); + + // long form + shardId = "shardId-0001536805703746-69688cb1aljkwerijfl8228sl12a123akfla"; + assertEquals(true, DynamodbStreamsShardHandle.isValidShardId(shardId)); + + // invalid with wrong prefix + shardId = "sId-0001536805703746-69688cb1"; + assertEquals(false, DynamodbStreamsShardHandle.isValidShardId(shardId)); + + // invalid with non-digits + shardId = "shardId-000153680570aabb-69688cb1"; + assertEquals(false, DynamodbStreamsShardHandle.isValidShardId(shardId)); + + // invalid with shardId too long + shardId = "shardId-0001536805703746-69688cb1aljkwerijfl8228sl12a123akfla"; + assertEquals(false, DynamodbStreamsShardHandle.isValidShardId(shardId)); + } + + @Test + public void testCompareShardId() { + final int numShardIds = 10; + final int shardIdDigitLen = 20; + final String zeros = ""; // twenty '0' chars + String shardIdValid = "shardId-0001536805703746-69688cb1"; + String shardIdInvalid = "shardId-000153680570aabb-69688cb1"; + + assertEquals(0, DynamodbStreamsShardHandle.compareShardIds(shardIdValid, shardIdValid)); + + // comparison of invalid shardIds should yield exception + try { + DynamodbStreamsShardHandle.compareShardIds(shardIdValid, shardIdInvalid); + fail("invalid shard Id" + shardIdInvalid + " should trigger exception"); + } catch (IllegalArgumentException e) { + // ignore + } + try { + DynamodbStreamsShardHandle.compareShardIds(shardIdInvalid, shardIdValid); + fail("invalid shard Id" + shardIdInvalid + " should trigger exception"); + } catch (IllegalArgumentException e) { + // ignore Review comment: this should be `// expected` instead? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL:
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16670095#comment-16670095 ] ASF GitHub Bot commented on FLINK-4582: --- tweise commented on a change in pull request #6968: [FLINK-4582] Consuming data from DynamoDB streams to flink URL: https://github.com/apache/flink/pull/6968#discussion_r229687927 ## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/dynamodbstreams/FlinkDynamodbStreamsConsumer.java ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.dynamodbstreams; Review comment: All of the newly added packages should be under `org.apache.flink.streaming.connectors.kinesis` as they are part of the `flink-connector-kinesis` module. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Ying Xu >Priority: Major > Labels: pull-request-available > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference in resharding behaviours, so it is possible to build > on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB > Streams source. > I propose an API something like this: > {code} > DataStream dynamoItemsCdc = > FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) > {code} > The feature adds more connectivity to popular AWS services for Flink, and > combining what Flink has for exactly-once semantics, out-of-core state > backends, and queryable state with CDC can have very strong use cases. For > this feature there should only be an extra dependency to the AWS Java SDK for > DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16670091#comment-16670091 ] ASF GitHub Bot commented on FLINK-4582: --- tweise commented on issue #6968: [FLINK-4582] Consuming data from DynamoDB streams to flink URL: https://github.com/apache/flink/pull/6968#issuecomment-434681760 @yxu-valleytider please include tag `[kinesis]` into the PR title and also format the commit messages as per convention `[] [kinesis] add nice feature` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Ying Xu >Priority: Major > Labels: pull-request-available > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference in resharding behaviours, so it is possible to build > on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB > Streams source. > I propose an API something like this: > {code} > DataStream dynamoItemsCdc = > FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) > {code} > The feature adds more connectivity to popular AWS services for Flink, and > combining what Flink has for exactly-once semantics, out-of-core state > backends, and queryable state with CDC can have very strong use cases. For > this feature there should only be an extra dependency to the AWS Java SDK for > DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16669881#comment-16669881 ] ASF GitHub Bot commented on FLINK-4582: --- zentol commented on issue #6968: [FLINK-4582] Consuming data from DynamoDB streams to flink URL: https://github.com/apache/flink/pull/6968#issuecomment-434632762 @yxu-valleytider Please make sure that the Pull Request title references the corresponding JIRA. I have modified the title accordingly this time. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Ying Xu >Priority: Major > Labels: pull-request-available > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference in resharding behaviours, so it is possible to build > on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB > Streams source. > I propose an API something like this: > {code} > DataStream dynamoItemsCdc = > FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) > {code} > The feature adds more connectivity to popular AWS services for Flink, and > combining what Flink has for exactly-once semantics, out-of-core state > backends, and queryable state with CDC can have very strong use cases. For > this feature there should only be an extra dependency to the AWS Java SDK for > DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667569#comment-16667569 ] Ying Xu commented on FLINK-4582: Thanks [~tinder-dthomson] for the detailed comments. Yes that's exactly why I felt _efficient multi-stream_ support is somehow lacking :). Actually, we are running Flink 1.5.2 internally. For contributing to upstream, I'm currently adapting the patch to fit the master flink (1.7-SNAPSNOT). The main difference is flink 1.7 Kinesis connector uses the _listshards API_ to retrieve the shard list. For DynamoDB streams, we must use the _describeStreams API_ to retrieve such information since listshards is not supported. I am currently porting related logic around _describeStreams_ from the 1.5 flink to my patch. I shall be able to post a meaningful PR in 1-2 days. > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Ying Xu >Priority: Major > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference in resharding behaviours, so it is possible to build > on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB > Streams source. > I propose an API something like this: > {code} > DataStream dynamoItemsCdc = > FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) > {code} > The feature adds more connectivity to popular AWS services for Flink, and > combining what Flink has for exactly-once semantics, out-of-core state > backends, and queryable state with CDC can have very strong use cases. For > this feature there should only be an extra dependency to the AWS Java SDK for > DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16665593#comment-16665593 ] Devin Thomson commented on FLINK-4582: -- [~yxu-apache] I appreciate the quick response! "The use of _DynamodbProxy.getShardList()_ is interesting." Haha very kind of you to not point out the potential performance issue of always fetching all the shard ids. I didn't want to duplicate the code of DynamoDBProxy, but yes this approach suffers from a complete traversal of the shard ids every time. I have also observed that the shard ids are always returned in the same (unsorted) ordering, so your approach sounds good to me. Due to the fact that we run our Flink clusters in AWS EMR, which does not support high-availability master nodes, we will not be using multi-stream consumers so I did not implement that support here and saw DynamoDBProxy as a natural solution. The alternative was to use one instance of DynamoDBProxy per stream. I didn't look into the memory implications but I assume it's worse than what you have built :) We are still in development over here using the solution I built, but I would be glad to cutover to your solution once it's available! One question would be - is it compatible with Flink 1.5.2? As I mentioned, we run in AWS EMR which only supports 1.5.2 in the latest release. Thank you!! Devin > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Ying Xu >Priority: Major > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference in resharding behaviours, so it is possible to build > on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB > Streams source. > I propose an API something like this: > {code} > DataStream dynamoItemsCdc = > FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) > {code} > The feature adds more connectivity to popular AWS services for Flink, and > combining what Flink has for exactly-once semantics, out-of-core state > backends, and queryable state with CDC can have very strong use cases. For > this feature there should only be an extra dependency to the AWS Java SDK for > DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16665527#comment-16665527 ] Ying Xu commented on FLINK-4582: Hi [~tinder-dthomson] thanks for raising this issue up. And sorry for the delay in responding to the original request. We actually implemented a version of the flink-dynamodbstreams connector on top of the existing flink-kinesis connector. The work is currently in production and was presented in a meetup event back in Sep. I wasn't able to get a chance to contribute back because of other work priorities – my bad! I looked at your PR. The use of _DynamodbProxy.getShardList()_ is interesting. We took a slightly different approach, which plugs in a dynamodbstreams-kinesis adapter object into KinesisProxy and makes it an equivalent _DynamodbProxy_ (approach mentioned in another thread titled *Consuming data from dynamoDB streams to flink*). We rely on the assumption that during re-sharing, one can retrieve all the new child shard Ids based on passing the last seen shardId. Although Dynamodbstreams do not officially claim this, we consistently observed similar behavior in production during resharding. Other benefits of directly embedding a dynamodbstreams-kinesis adapter is to allow *ONE* source (consumer) to consume from multiple data streams (which is important for our use cases), plus other error handling in the existing KinesisProxy. I agree that if the _DynamodbProxy_ provides _efficient multi-stream_ implementation, it is an interesting direction to look into. If you can wait a few days, I can adapt my PR on top of the OSS flink and post it by early next week. We can have more discussions at then. What do you think? Thank you very much! > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Ying Xu >Priority: Major > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference in resharding behaviours, so it is possible to build > on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB > Streams source. > I propose an API something like this: > {code} > DataStream dynamoItemsCdc = > FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) > {code} > The feature adds more connectivity to popular AWS services for Flink, and > combining what Flink has for exactly-once semantics, out-of-core state > backends, and queryable state with CDC can have very strong use cases. For > this feature there should only be an extra dependency to the AWS Java SDK for > DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664305#comment-16664305 ] Devin Thomson commented on FLINK-4582: -- [~yxu-lyft] [~tzulitai] Hey guys! Devin from Tinder Engineering here (https://www.linkedin.com/in/devin-thomson-44a35651/). I've been following this ticket for a couple of weeks now. We have a strong need for this at Tinder so I went ahead and built it! Here's a PR: [https://github.com/tinder-dthomson/flink/pull/3] I am not a contributor so I had to fork flink. If you want to take a look and let me know what you think, I'd be glad to contribute this back to the community! Also [~yxu-lyft] I don't mean to step on your toes here - if you have a better solution I am of course happy to use that instead! - Devin > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Ying Xu >Priority: Major > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference in resharding behaviours, so it is possible to build > on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB > Streams source. > I propose an API something like this: > {code} > DataStream dynamoItemsCdc = > FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) > {code} > The feature adds more connectivity to popular AWS services for Flink, and > combining what Flink has for exactly-once semantics, out-of-core state > backends, and queryable state with CDC can have very strong use cases. For > this feature there should only be an extra dependency to the AWS Java SDK for > DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16560327#comment-16560327 ] Ying Xu commented on FLINK-4582: [~tzulitai] Just an update, we are very close to have a working version. > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Priority: Major > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference in resharding behaviours, so it is possible to build > on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB > Streams source. > I propose an API something like this: > {code} > DataStream dynamoItemsCdc = > FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) > {code} > The feature adds more connectivity to popular AWS services for Flink, and > combining what Flink has for exactly-once semantics, out-of-core state > backends, and queryable state with CDC can have very strong use cases. For > this feature there should only be an extra dependency to the AWS Java SDK for > DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16525650#comment-16525650 ] Ying Xu commented on FLINK-4582: Hi: [~tzulitai] [~mingdaoy] I'm following up on this JIRA as we currently have a production use case which requires injecting the DynamoDB changelogs into Kafka. Interested in contributing to related efforts as well. I have raised a request on the dev mailing list ([raw message|https://mail-archives.apache.org/mod_mbox/flink-dev/201806.mbox/raw/%3CCAJ5M44_FC8u713SWHCZx02FtEfyM8RpDF%2BeTNS9W%3DTC4JkVicQ%40mail.gmail.com%3E]) Thanks. > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Mingdao Yang >Priority: Major > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference in resharding behaviours, so it is possible to build > on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB > Streams source. > I propose an API something like this: > {code} > DataStream dynamoItemsCdc = > FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) > {code} > The feature adds more connectivity to popular AWS services for Flink, and > combining what Flink has for exactly-once semantics, out-of-core state > backends, and queryable state with CDC can have very strong use cases. For > this feature there should only be an extra dependency to the AWS Java SDK for > DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)