[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2019-02-04 Thread Ying Xu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16760284#comment-16760284
 ] 

Ying Xu commented on FLINK-4582:


Thanks [~tinder-dthomson] .   Internally we use the *new ObjectMapper()* 
initialization but didn't observe similar issue.  But we may run with an older 
Flink distribution.

Yes please report this as a separate bug and perhaps attach full stack trace.  
If you already have a fix, feel free to post the PR as well. 

> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2019-02-04 Thread Thomas Weise (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16760263#comment-16760263
 ] 

Thomas Weise commented on FLINK-4582:
-

[~tinder-dthomson] Thanks for the report and yes, please track it as a separate 
issue and mark the fix version as 1.8 so that we get it resolved as part of 1.8.

> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2019-02-04 Thread Devin Thomson (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16760250#comment-16760250
 ] 

Devin Thomson commented on FLINK-4582:
--

[~yxu-lyft] circling back here, looks like this got merged in, congrats!

 

I've been preemptively cutting over from my implementation to this one and 
noticed one (blocking) bug:

 
{code:java}
j.l.IllegalArgumentException: Conflicting setter definitions for property 
\"eventName\": 
org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.Record#setEventName(1
 params) vs 
org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.Record#setEventName(1
 params)
at c.f.j.d.i.POJOPropertyBuilder.getSetter(POJOPropertyBuilder.java:300)
at 
c.f.j.d.d.BeanDeserializerFactory.filterBeanProps(BeanDeserializerFactory.java:619)
at 
c.f.j.d.d.BeanDeserializerFactory.addBeanProps(BeanDeserializerFactory.java:515)
at 
c.f.j.d.d.BeanDeserializerFactory.buildBeanDeserializer(BeanDeserializerFactory.java:256)
at 
c.f.j.d.d.BeanDeserializerFactory.createBeanDeserializer(BeanDeserializerFactory.java:169)
at c.f.j.d.d.DeserializerCache._createDeserializer2(DeserializerCache.java:403)
at c.f.j.d.d.DeserializerCache._createDeserializer(DeserializerCache.java:352)
at c.f.j.d.d.DeserializerCache._createAndCache2(DeserializerCache.java:264)
... 15 common frames omitted
Wrapped by: c.f.j.d.JsonMappingException: Conflicting setter definitions for 
property \"eventName\": 
org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.Record#setEventName(1
 params...
{code}
 

 

It looks like the root cause is that DynamoDBStreamsSchema.java defines the 
object mapper as follows:
{code:java}
private static final ObjectMapper MAPPER = new ObjectMapper();
{code}
When it should be using the appropriate mix-ins offered by the dynamodb stream 
adapter library:
{code:java}
private static final ObjectMapper MAPPER = new RecordObjectMapper();
{code}
This appears to resolve the issue, I tested by using my own deserializer 
implementation. Not sure if it makes sense to track this as a separate issue or 
not since this is still a 1.8-SNAPSHOT feature.

 

Let me know if you have any questions!

- Devin

 

> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-12-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16709203#comment-16709203
 ] 

ASF GitHub Bot commented on FLINK-4582:
---

tweise commented on a change in pull request #6968: [FLINK-4582] [kinesis] 
Consuming data from DynamoDB streams to flink
URL: https://github.com/apache/flink/pull/6968#discussion_r238817945
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 ##
 @@ -143,6 +143,34 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
/** The interval after which to consider a shard idle for purposes of 
watermark generation. */
public static final String SHARD_IDLE_INTERVAL_MILLIS = 
"flink.shard.idle.interval";
 
+  /**
+* The base backoff time between each describeStream attempt.
+* Different tag name to distinguish from 
"flink.stream.describe.backoff.base"
+* since the latter is deprecated.
+*/
+   public static final String DYNAMODB_STREAMS_DESCRIBE_BACKOFF_BASE =
 
 Review comment:
   It would be good to modify the method to map/replicate the keys but not 
remove them (since you need them for dynamo). Add a comment that this is for 
backward compatibility for the regular proxy that is now using listShards 
instead of describeShards. We can remove the mapping in later release.  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>  Labels: pull-request-available
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707635#comment-16707635
 ] 

ASF GitHub Bot commented on FLINK-4582:
---

yxu-valleytider commented on a change in pull request #6968: [FLINK-4582] 
[kinesis] Consuming data from DynamoDB streams to flink
URL: https://github.com/apache/flink/pull/6968#discussion_r238386976
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 ##
 @@ -143,6 +143,34 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
/** The interval after which to consider a shard idle for purposes of 
watermark generation. */
public static final String SHARD_IDLE_INTERVAL_MILLIS = 
"flink.shard.idle.interval";
 
+  /**
+* The base backoff time between each describeStream attempt.
+* Different tag name to distinguish from 
"flink.stream.describe.backoff.base"
+* since the latter is deprecated.
+*/
+   public static final String DYNAMODB_STREAMS_DESCRIBE_BACKOFF_BASE =
 
 Review comment:
   @tweise  are you ok with removing this function 
`KinesisProxy::replaceDeprecatedConsumerKeys` as well ? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>  Labels: pull-request-available
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-11-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698253#comment-16698253
 ] 

ASF GitHub Bot commented on FLINK-4582:
---

tweise commented on a change in pull request #6968: [FLINK-4582] [kinesis] 
Consuming data from DynamoDB streams to flink
URL: https://github.com/apache/flink/pull/6968#discussion_r236085780
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 ##
 @@ -143,6 +143,34 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
/** The interval after which to consider a shard idle for purposes of 
watermark generation. */
public static final String SHARD_IDLE_INTERVAL_MILLIS = 
"flink.shard.idle.interval";
 
+  /**
+* The base backoff time between each describeStream attempt.
+* Different tag name to distinguish from 
"flink.stream.describe.backoff.base"
+* since the latter is deprecated.
+*/
+   public static final String DYNAMODB_STREAMS_DESCRIBE_BACKOFF_BASE =
 
 Review comment:
   Remove these and remove the `@Deprecated` annotations from the prior 
properties.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>  Labels: pull-request-available
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-11-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698254#comment-16698254
 ] 

ASF GitHub Bot commented on FLINK-4582:
---

tweise commented on a change in pull request #6968: [FLINK-4582] [kinesis] 
Consuming data from DynamoDB streams to flink
URL: https://github.com/apache/flink/pull/6968#discussion_r236085419
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkDynamodbStreamsConsumer.java
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.DynamodbStreamsDataFetcher;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.DynamodbStreamsSchema;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+
+import com.amazonaws.services.dynamodbv2.model.Record;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Consume events from the dyanmodbdb streams.
+ *
+ * @param  the type of data emitted
+ */
+public class FlinkDynamodbStreamsConsumer extends FlinkKinesisConsumer {
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDynamodbStreamsConsumer.class);
+
+   /**
+* Constructor of FlinkDynamodbStreamsConsumer.
+*
+* @param stream stream to consume
+* @param deserializer deserialization schema
+* @param config config properties
+*/
+   public FlinkDynamodbStreamsConsumer(
+   String stream,
+   DeserializationSchema deserializer,
+   Properties config) {
+   super(stream, deserializer, config);
+   }
+
+   /**
+* Constructor of FlinkDynamodbStreamConsumer.
+*
+* @param streams list of streams to consume
+* @param deserializer  deserialization schema
+* @param config config properties
+*/
+   public FlinkDynamodbStreamsConsumer(
+   List streams,
+   KinesisDeserializationSchema deserializer,
+   Properties config) {
+   super(streams, deserializer, config);
+   }
+
+   public static  FlinkDynamodbStreamsConsumer create(String stream,
 
 Review comment:
   Please remove these static create methods, they don't provide any benefit 
since the constructors are public (which is consistent with the base class).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>  Labels: pull-request-available
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference 

[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-11-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698252#comment-16698252
 ] 

ASF GitHub Bot commented on FLINK-4582:
---

tweise commented on a change in pull request #6968: [FLINK-4582] [kinesis] 
Consuming data from DynamoDB streams to flink
URL: https://github.com/apache/flink/pull/6968#discussion_r236085962
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamodbStreamsDataFetcher.java
 ##
 @@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
+import 
org.apache.flink.streaming.connectors.kinesis.metrics.ShardMetricsReporter;
+import 
org.apache.flink.streaming.connectors.kinesis.model.DynamodbStreamsShardHandle;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.DynamodbStreamsProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_DYNAMODB_STREAMS_SHARDID_FORMAT_CHECK;
+import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DYNAMODB_STREAMS_SHARDID_FORMAT_CHECK;
+
+/**
+ * Dynamodb streams data fetcher.
+ * @param  type of fetched data.
+ */
+public class DynamodbStreamsDataFetcher  extends KinesisDataFetcher {
+   private boolean shardIdFormatCheck = false;
+
+   /**
+* Constructor.
+*
+* @param streams list of streams to fetch data
+* @param sourceContext source context
+* @param runtimeContext runtime context
+* @param configProps config properties
+* @param deserializationSchema deserialization schema
+* @param shardAssigner shard assigner
+*/
+   public DynamodbStreamsDataFetcher(List streams,
+   SourceFunction.SourceContext sourceContext,
+   RuntimeContext runtimeContext,
+   Properties configProps,
+   KinesisDeserializationSchema deserializationSchema,
+   KinesisShardAssigner shardAssigner) {
+
+   super(streams,
+   sourceContext,
+   sourceContext.getCheckpointLock(),
+   runtimeContext,
+   configProps,
+   deserializationSchema,
+   shardAssigner,
+   null,
+   new AtomicReference<>(),
+   new ArrayList<>(),
+   
createInitialSubscribedStreamsToLastDiscoveredShardsState(streams),
+   // use DynamodbStreamsProxy
+   DynamodbStreamsProxy::create);
+
+   shardIdFormatCheck = Boolean.valueOf(configProps.getProperty(
+   DYNAMODB_STREAMS_SHARDID_FORMAT_CHECK,
+   DEFAULT_DYNAMODB_STREAMS_SHARDID_FORMAT_CHECK));
+   }
+
+   /**
+* Updates the last discovered shard of a subscribed stream; only 
updates if the update is valid.
+*/
+   @Override
+   public void advanceLastDiscoveredShardOfStream(String stream, String 
shardId) {
 
 Review comment:
   Rather than duplicating complete logic from the base class, can we just 
extract what is unique to DynamoDB? That might also eliminate the need to 
expose `subscribedStreamsToLastDiscoveredShardIds`?


This is an automated message from the Apache Git Service.

[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-11-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698250#comment-16698250
 ] 

ASF GitHub Bot commented on FLINK-4582:
---

tweise commented on a change in pull request #6968: [FLINK-4582] [kinesis] 
Consuming data from DynamoDB streams to flink
URL: https://github.com/apache/flink/pull/6968#discussion_r236085574
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/DynamodbStreamsProxy.java
 ##
 @@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.ClientConfigurationFactory;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import 
com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.LimitExceededException;
+import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.StreamStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_ENDPOINT;
+import static 
org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_REGION;
+import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_DYNAMODB_STREAMS_DESCRIBE_BACKOFF_BASE;
+import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_DYNAMODB_STREAMS_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT;
+import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_DYNAMODB_STREAMS_DESCRIBE_BACKOFF_MAX;
+import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DYNAMODB_STREAMS_DESCRIBE_BACKOFF_BASE;
+import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DYNAMODB_STREAMS_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT;
+import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DYNAMODB_STREAMS_DESCRIBE_BACKOFF_MAX;
+import static 
org.apache.flink.streaming.connectors.kinesis.util.AWSUtil.getCredentialsProvider;
+import static 
org.apache.flink.streaming.connectors.kinesis.util.AWSUtil.setAwsClientConfigProperties;
+
+/**
+ * DynamoDB streams proxy: interface interacting with the DynamoDB streams.
+ */
+public class DynamodbStreamsProxy extends KinesisProxy {
+   private static final Logger LOG = 
LoggerFactory.getLogger(DynamodbStreamsProxy.class);
+
+   /** Used for formatting Flink-specific user agent string when creating 
Kinesis client. */
+   private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) 
DynamoDB Streams Connector";
+
+   // Backoff millis for the describe stream operation.
+   private final long describeStreamBaseBackoffMillis;
+   // Maximum backoff millis for the describe stream operation.
+   private final long describeStreamMaxBackoffMillis;
+   // Exponential backoff power constant for the describe stream operation.
+   private final double describeStreamExpConstant;
+
+   protected DynamodbStreamsProxy(Properties configProps) {
+   super(configProps);
+
+   // parse properties
+   describeStreamBaseBackoffMillis = Long.valueOf(
+   

[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-11-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698251#comment-16698251
 ] 

ASF GitHub Bot commented on FLINK-4582:
---

tweise commented on a change in pull request #6968: [FLINK-4582] [kinesis] 
Consuming data from DynamoDB streams to flink
URL: https://github.com/apache/flink/pull/6968#discussion_r236085609
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ##
 @@ -73,7 +73,7 @@
private static final Logger LOG = 
LoggerFactory.getLogger(KinesisProxy.class);
 
/** The actual Kinesis client from the AWS SDK that we will be using to 
make calls. */
-   private final AmazonKinesis kinesisClient;
+   protected final AmazonKinesis kinesisClient;
 
 Review comment:
   If `describeStream` moves to `KinesisProxy`, this change becomes unnecessary.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>  Labels: pull-request-available
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-11-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16696228#comment-16696228
 ] 

ASF GitHub Bot commented on FLINK-4582:
---

yxu-valleytider commented on issue #6968: [FLINK-4582] [kinesis] Consuming data 
from DynamoDB streams to flink
URL: https://github.com/apache/flink/pull/6968#issuecomment-44240
 
 
   Adjusted package name based on comments.  
   
   PTAL @tweise @tzulitai


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>  Labels: pull-request-available
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-11-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16688872#comment-16688872
 ] 

ASF GitHub Bot commented on FLINK-4582:
---

tweise commented on a change in pull request #6968: [FLINK-4582] [kinesis] 
Consuming data from DynamoDB streams to flink
URL: https://github.com/apache/flink/pull/6968#discussion_r234063079
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/dynamodbstreams/FlinkDynamodbStreamsConsumer.java
 ##
 @@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodbstreams;
 
 Review comment:
   I don't think that will work: We had already determined in our internal work 
that we need a single shaded AWS dependency. I don't think it's desirable 
either, because it is just a slight variation of the Kinesis consumer that does 
not deserve a separate module.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>  Labels: pull-request-available
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-11-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16688865#comment-16688865
 ] 

ASF GitHub Bot commented on FLINK-4582:
---

yxu-valleytider commented on a change in pull request #6968: [FLINK-4582] 
[kinesis] Consuming data from DynamoDB streams to flink
URL: https://github.com/apache/flink/pull/6968#discussion_r234060924
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/dynamodbstreams/FlinkDynamodbStreamsConsumer.java
 ##
 @@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodbstreams;
 
 Review comment:
   In that case I don't mind putting the files into a separate directory, e.g., 
`flink-connector-dynamodbstreams`.  WDYT ? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>  Labels: pull-request-available
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-11-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16678300#comment-16678300
 ] 

ASF GitHub Bot commented on FLINK-4582:
---

tweise commented on a change in pull request #6968: [FLINK-4582] [kinesis] 
Consuming data from DynamoDB streams to flink
URL: https://github.com/apache/flink/pull/6968#discussion_r231527728
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/dynamodbstreams/FlinkDynamodbStreamsConsumer.java
 ##
 @@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodbstreams;
 
 Review comment:
   These are just packages with the module `flink-connector-kinesis` and the 
package prefix should reflect that. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>  Labels: pull-request-available
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-11-01 Thread Ying Xu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16671216#comment-16671216
 ] 

Ying Xu commented on FLINK-4582:


[~tinder-dthomson] Posted the PR here:  
https://github.com/apache/flink/pull/6968

> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>  Labels: pull-request-available
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16671214#comment-16671214
 ] 

ASF GitHub Bot commented on FLINK-4582:
---

yxu-valleytider commented on issue #6968: [FLINK-4582] [kinesis] Consuming data 
from DynamoDB streams to flink
URL: https://github.com/apache/flink/pull/6968#issuecomment-434959900
 
 
   PTAL @tzulitai 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>  Labels: pull-request-available
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16671195#comment-16671195
 ] 

ASF GitHub Bot commented on FLINK-4582:
---

yxu-valleytider commented on a change in pull request #6968: [FLINK-4582] 
[kinesis] Consuming data from DynamoDB streams to flink
URL: https://github.com/apache/flink/pull/6968#discussion_r229956874
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ##
 @@ -73,7 +73,7 @@
private static final Logger LOG = 
LoggerFactory.getLogger(KinesisProxy.class);
 
/** The actual Kinesis client from the AWS SDK that we will be using to 
make calls. */
-   private final AmazonKinesis kinesisClient;
+   protected final AmazonKinesis kinesisClient;
 
 Review comment:
   It is used inside `DynamodbStreamsProxy` to [execute the `describeStream` 
call](https://github.com/apache/flink/blob/918eb652a6fa865652b464e742c5f5b6c08c5be6/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/dynamodbstreams/proxy/DynamodbStreamsProxy.java#L197).
  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>  Labels: pull-request-available
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16671196#comment-16671196
 ] 

ASF GitHub Bot commented on FLINK-4582:
---

yxu-valleytider commented on a change in pull request #6968: [FLINK-4582] 
[kinesis] Consuming data from DynamoDB streams to flink
URL: https://github.com/apache/flink/pull/6968#discussion_r229956951
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/dynamodbstreams/FlinkDynamodbStreamsConsumer.java
 ##
 @@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodbstreams;
 
 Review comment:
   Hmm, both the `dynamodbstreams` and `kinesis` modules are under the parent 
module `org.apache.flink.streaming.connectors`.   This is also consistent with 
the directory structure.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>  Labels: pull-request-available
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16671197#comment-16671197
 ] 

ASF GitHub Bot commented on FLINK-4582:
---

yxu-valleytider commented on a change in pull request #6968: [FLINK-4582] 
[kinesis] Consuming data from DynamoDB streams to flink
URL: https://github.com/apache/flink/pull/6968#discussion_r229958032
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/dynamodbstreams/model/DynamodbStreamsShardHandleTest.java
 ##
 @@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodbstreams.model;
+
+import org.junit.Test;
+
+import static 
org.apache.flink.streaming.connectors.dynamodbstreams.model.DynamodbStreamsShardHandle.SHARDID_PREFIX;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Shard handle unit tests.
+ */
+public class DynamodbStreamsShardHandleTest {
+   @Test
+   public void testIsValidShardId() {
+   // normal form
+   String shardId = "shardId-0001536805703746-69688cb1";
+   assertEquals(true, 
DynamodbStreamsShardHandle.isValidShardId(shardId));
+
+   // short form
+   shardId = "shardId-0001536805703746";
+   assertEquals(true, 
DynamodbStreamsShardHandle.isValidShardId(shardId));
+
+   // long form
+   shardId = 
"shardId-0001536805703746-69688cb1aljkwerijfl8228sl12a123akfla";
+   assertEquals(true, 
DynamodbStreamsShardHandle.isValidShardId(shardId));
+
+   // invalid with wrong prefix
+   shardId = "sId-0001536805703746-69688cb1";
+   assertEquals(false, 
DynamodbStreamsShardHandle.isValidShardId(shardId));
+
+   // invalid with non-digits
+   shardId = "shardId-000153680570aabb-69688cb1";
+   assertEquals(false, 
DynamodbStreamsShardHandle.isValidShardId(shardId));
+
+   // invalid with shardId too long
+   shardId = 
"shardId-0001536805703746-69688cb1aljkwerijfl8228sl12a123akfla";
+   assertEquals(false, 
DynamodbStreamsShardHandle.isValidShardId(shardId));
+   }
+
+   @Test
+   public void testCompareShardId() {
+   final int numShardIds = 10;
+   final int shardIdDigitLen = 20;
+   final String zeros = "";  // twenty '0' 
chars
+   String shardIdValid = "shardId-0001536805703746-69688cb1";
+   String shardIdInvalid = "shardId-000153680570aabb-69688cb1";
+
+   assertEquals(0, 
DynamodbStreamsShardHandle.compareShardIds(shardIdValid, shardIdValid));
+
+   // comparison of invalid shardIds should yield exception
+   try {
+   
DynamodbStreamsShardHandle.compareShardIds(shardIdValid, shardIdInvalid);
+   fail("invalid shard Id" + shardIdInvalid + " should 
trigger exception");
+   } catch (IllegalArgumentException e) {
+   // ignore
+   }
+   try {
+   
DynamodbStreamsShardHandle.compareShardIds(shardIdInvalid, shardIdValid);
+   fail("invalid shard Id" + shardIdInvalid + " should 
trigger exception");
+   } catch (IllegalArgumentException e) {
+   // ignore
 
 Review comment:
   yes `// expected` clarifies more.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: 

[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-10-31 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16670101#comment-16670101
 ] 

ASF GitHub Bot commented on FLINK-4582:
---

tweise commented on a change in pull request #6968: [FLINK-4582] Consuming data 
from DynamoDB streams to flink
URL: https://github.com/apache/flink/pull/6968#discussion_r229688903
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ##
 @@ -73,7 +73,7 @@
private static final Logger LOG = 
LoggerFactory.getLogger(KinesisProxy.class);
 
/** The actual Kinesis client from the AWS SDK that we will be using to 
make calls. */
-   private final AmazonKinesis kinesisClient;
+   protected final AmazonKinesis kinesisClient;
 
 Review comment:
   why this modifier change?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>  Labels: pull-request-available
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-10-31 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16670097#comment-16670097
 ] 

ASF GitHub Bot commented on FLINK-4582:
---

tweise commented on a change in pull request #6968: [FLINK-4582] Consuming data 
from DynamoDB streams to flink
URL: https://github.com/apache/flink/pull/6968#discussion_r229688269
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/dynamodbstreams/model/DynamodbStreamsShardHandleTest.java
 ##
 @@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodbstreams.model;
+
+import org.junit.Test;
+
+import static 
org.apache.flink.streaming.connectors.dynamodbstreams.model.DynamodbStreamsShardHandle.SHARDID_PREFIX;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Shard handle unit tests.
+ */
+public class DynamodbStreamsShardHandleTest {
+   @Test
+   public void testIsValidShardId() {
+   // normal form
+   String shardId = "shardId-0001536805703746-69688cb1";
+   assertEquals(true, 
DynamodbStreamsShardHandle.isValidShardId(shardId));
+
+   // short form
+   shardId = "shardId-0001536805703746";
+   assertEquals(true, 
DynamodbStreamsShardHandle.isValidShardId(shardId));
+
+   // long form
+   shardId = 
"shardId-0001536805703746-69688cb1aljkwerijfl8228sl12a123akfla";
+   assertEquals(true, 
DynamodbStreamsShardHandle.isValidShardId(shardId));
+
+   // invalid with wrong prefix
+   shardId = "sId-0001536805703746-69688cb1";
+   assertEquals(false, 
DynamodbStreamsShardHandle.isValidShardId(shardId));
+
+   // invalid with non-digits
+   shardId = "shardId-000153680570aabb-69688cb1";
+   assertEquals(false, 
DynamodbStreamsShardHandle.isValidShardId(shardId));
+
+   // invalid with shardId too long
+   shardId = 
"shardId-0001536805703746-69688cb1aljkwerijfl8228sl12a123akfla";
+   assertEquals(false, 
DynamodbStreamsShardHandle.isValidShardId(shardId));
+   }
+
+   @Test
+   public void testCompareShardId() {
+   final int numShardIds = 10;
+   final int shardIdDigitLen = 20;
+   final String zeros = "";  // twenty '0' 
chars
+   String shardIdValid = "shardId-0001536805703746-69688cb1";
+   String shardIdInvalid = "shardId-000153680570aabb-69688cb1";
+
+   assertEquals(0, 
DynamodbStreamsShardHandle.compareShardIds(shardIdValid, shardIdValid));
+
+   // comparison of invalid shardIds should yield exception
+   try {
+   
DynamodbStreamsShardHandle.compareShardIds(shardIdValid, shardIdInvalid);
+   fail("invalid shard Id" + shardIdInvalid + " should 
trigger exception");
+   } catch (IllegalArgumentException e) {
+   // ignore
+   }
+   try {
+   
DynamodbStreamsShardHandle.compareShardIds(shardIdInvalid, shardIdValid);
+   fail("invalid shard Id" + shardIdInvalid + " should 
trigger exception");
+   } catch (IllegalArgumentException e) {
+   // ignore
 
 Review comment:
   this should be `// expected` instead?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: 

[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-10-31 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16670095#comment-16670095
 ] 

ASF GitHub Bot commented on FLINK-4582:
---

tweise commented on a change in pull request #6968: [FLINK-4582] Consuming data 
from DynamoDB streams to flink
URL: https://github.com/apache/flink/pull/6968#discussion_r229687927
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/dynamodbstreams/FlinkDynamodbStreamsConsumer.java
 ##
 @@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodbstreams;
 
 Review comment:
   All of the newly added packages should be under 
`org.apache.flink.streaming.connectors.kinesis` as they are part of the 
`flink-connector-kinesis` module.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>  Labels: pull-request-available
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-10-31 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16670091#comment-16670091
 ] 

ASF GitHub Bot commented on FLINK-4582:
---

tweise commented on issue #6968: [FLINK-4582] Consuming data from DynamoDB 
streams to flink
URL: https://github.com/apache/flink/pull/6968#issuecomment-434681760
 
 
   @yxu-valleytider please include tag `[kinesis]` into the PR title and also 
format the commit messages as per convention `[] [kinesis] add nice 
feature`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>  Labels: pull-request-available
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-10-31 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16669881#comment-16669881
 ] 

ASF GitHub Bot commented on FLINK-4582:
---

zentol commented on issue #6968: [FLINK-4582] Consuming data from DynamoDB 
streams to flink
URL: https://github.com/apache/flink/pull/6968#issuecomment-434632762
 
 
   @yxu-valleytider Please make sure that the Pull Request title references the 
corresponding JIRA. I have modified the title accordingly this time.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>  Labels: pull-request-available
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-10-29 Thread Ying Xu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667569#comment-16667569
 ] 

Ying Xu commented on FLINK-4582:


Thanks [~tinder-dthomson] for the detailed comments.   Yes that's exactly why I 
felt _efficient multi-stream_ support is somehow lacking :).  

Actually, we are running Flink 1.5.2 internally. For contributing to upstream, 
I'm currently adapting the patch to fit the master flink (1.7-SNAPSNOT). The 
main difference is flink 1.7 Kinesis connector uses the _listshards API_ to 
retrieve the shard list. For DynamoDB streams, we must use the _describeStreams 
API_ to retrieve such information since listshards is not supported. I am 
currently porting related logic around _describeStreams_ from the 1.5 flink to 
my patch.  I shall be able to post a meaningful PR in 1-2 days.  

> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-10-26 Thread Devin Thomson (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16665593#comment-16665593
 ] 

Devin Thomson commented on FLINK-4582:
--

[~yxu-apache] I appreciate the quick response!

"The use of _DynamodbProxy.getShardList()_ is interesting." Haha very kind of 
you to not point out the potential performance issue of always fetching all the 
shard ids. I didn't want to duplicate the code of DynamoDBProxy, but yes this 
approach suffers from a complete traversal of the shard ids every time. I have 
also observed that the shard ids are always returned in the same (unsorted) 
ordering, so your approach sounds good to me.

Due to the fact that we run our Flink clusters in AWS EMR, which does not 
support high-availability master nodes, we will not be using multi-stream 
consumers so I did not implement that support here and saw DynamoDBProxy as a 
natural solution. The alternative was to use one instance of DynamoDBProxy per 
stream. I didn't look into the memory implications but I assume it's worse than 
what you have built :)

We are still in development over here using the solution I built, but I would 
be glad to cutover to your solution once it's available! One question would be 
- is it compatible with Flink 1.5.2? As I mentioned, we run in AWS EMR which 
only supports 1.5.2 in the latest release.

 

Thank you!!

Devin

> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-10-26 Thread Ying Xu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16665527#comment-16665527
 ] 

Ying Xu commented on FLINK-4582:


Hi [~tinder-dthomson] thanks for raising this issue up.  And sorry for the 
delay in responding to the original request. 

We actually implemented a version of the flink-dynamodbstreams connector on top 
of the existing flink-kinesis connector. The work is currently in production 
and was presented in a meetup event back in Sep.  I wasn't able to get a chance 
to contribute back because of other work priorities – my bad!  

I looked at your PR.  The use of _DynamodbProxy.getShardList()_ is interesting. 
We took a slightly different approach, which plugs in a dynamodbstreams-kinesis 
adapter object into KinesisProxy and makes it an equivalent _DynamodbProxy_ 
(approach mentioned in another thread titled *Consuming data from dynamoDB 
streams to flink*).  We rely on the assumption that during re-sharing, one can 
retrieve all the new child shard Ids based on passing the last seen shardId. 
Although Dynamodbstreams do not officially claim this, we consistently observed 
similar behavior in production during resharding. 

Other benefits of directly embedding a dynamodbstreams-kinesis adapter is to 
allow *ONE* source (consumer) to consume from multiple data streams (which is 
important for our use cases), plus other error handling in the existing 
KinesisProxy. I agree that if the _DynamodbProxy_ provides _efficient 
multi-stream_ implementation, it is an interesting direction to look into. 

If you can wait a few days, I can adapt my PR on top of the OSS flink and post 
it by early next week.  We can have more discussions at then. What do you think?

Thank you very much!

 

> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-10-25 Thread Devin Thomson (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664305#comment-16664305
 ] 

Devin Thomson commented on FLINK-4582:
--

[~yxu-lyft] [~tzulitai]

Hey guys! Devin from Tinder Engineering here 
(https://www.linkedin.com/in/devin-thomson-44a35651/).

I've been following this ticket for a couple of weeks now. We have a strong 
need for this at Tinder so I went ahead and built it! Here's a PR:

[https://github.com/tinder-dthomson/flink/pull/3]

I am not a contributor so I had to fork flink. If you want to take a look and 
let me know what you think, I'd be glad to contribute this back to the 
community!

Also [~yxu-lyft] I don't mean to step on your toes here - if you have a better 
solution I am of course happy to use that instead!

 

- Devin

> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-07-27 Thread Ying Xu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16560327#comment-16560327
 ] 

Ying Xu commented on FLINK-4582:


[~tzulitai]  Just an update, we are very close to have a working version. 

> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Major
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-06-27 Thread Ying Xu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16525650#comment-16525650
 ] 

Ying Xu commented on FLINK-4582:


Hi: 

[~tzulitai]  [~mingdaoy]  

I'm following up on this JIRA as we currently have a production use case which 
requires injecting the DynamoDB changelogs into Kafka. Interested in 
contributing to related efforts as well. I have raised a request on the dev 
mailing list ([raw 
message|https://mail-archives.apache.org/mod_mbox/flink-dev/201806.mbox/raw/%3CCAJ5M44_FC8u713SWHCZx02FtEfyM8RpDF%2BeTNS9W%3DTC4JkVicQ%40mail.gmail.com%3E])
 

 

Thanks. 

> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Mingdao Yang
>Priority: Major
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)