[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/2016#issuecomment-55208 Okay, I've filed https://issues.apache.org/jira/browse/FLINK-3983. However, before you jump onto it: While I wrote the JIRA, I was actually wondering whether it makes sense now to address this, if we are going to drop the the KPL in the next pull request. I hope I can make up my mind until Monday. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
Github user aozturk commented on the pull request: https://github.com/apache/flink/pull/2016#issuecomment-55079 Repeated lines were terrible, it looks neater now. But it seems the if checks for max count configurations in FlinkKinesisProducer.java are redundant now. Adding all configurations sounds good. I thought about it but was not sure. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/2016#issuecomment-52000 I have to admit that these changes were not necessary: https://github.com/apache/flink/commit/23d1cba72859339bd3ee8f877b031353380c87fb#diff-d34eb8418ccff3e2d59a3e660fed6cec I wanted to change it so that the code won't throw any exceptions if the provided value is not correctly parseable. I didn't see that the verify method in the constructor is already doing that. I'll add a follow up JIRA for adding configuration keys for all relevant fields in `KinesisProducerConfiguration` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
Github user aozturk commented on the pull request: https://github.com/apache/flink/pull/2016#issuecomment-50998 Thanks for the merge! Your latest enhancements look great. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2016 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/2016#issuecomment-222136775 Thanks a lot for addressing the comments. I'll push the changes to travis, once its green I'll merge them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
Github user aozturk commented on the pull request: https://github.com/apache/flink/pull/2016#issuecomment-222111542 Pushed updates according to your comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2016#discussion_r64767022 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -160,12 +168,13 @@ public void setCustomPartitioner(KinesisPartitioner partitioner) { public void open(Configuration parameters) throws Exception { super.open(parameters); - KinesisProducerConfiguration config = new KinesisProducerConfiguration(); - config.setRegion(this.region); - config.setCredentialsProvider(new StaticCredentialsProvider(new BasicAWSCredentials(this.accessKey, this.secretKey))); + KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration(); + + producerConfig.setRegion(configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION)); + producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps)); //config.setCollectionMaxCount(1); //config.setAggregationMaxCount(1); --- End diff -- I agree that using the KPL is in general a good idea. The problem is that the license of the KPL (the Amazon Software License) is not compatible with the Apache Software License, therefore, projects at Apache can not depend on such code. We will not release the Flink kinesis connector with the next Flink release for that reason (unless we somehow fix this issue). Are you working at Amazon? If so, can you contact me at robert (at) data-artisans.com ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
Github user aozturk commented on a diff in the pull request: https://github.com/apache/flink/pull/2016#discussion_r64757778 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -160,12 +168,13 @@ public void setCustomPartitioner(KinesisPartitioner partitioner) { public void open(Configuration parameters) throws Exception { super.open(parameters); - KinesisProducerConfiguration config = new KinesisProducerConfiguration(); - config.setRegion(this.region); - config.setCredentialsProvider(new StaticCredentialsProvider(new BasicAWSCredentials(this.accessKey, this.secretKey))); + KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration(); + + producerConfig.setRegion(configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION)); + producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps)); //config.setCollectionMaxCount(1); //config.setAggregationMaxCount(1); --- End diff -- Ah sorry I have returned back to the code to be sure and it seems KPL is already used. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
Github user aozturk commented on a diff in the pull request: https://github.com/apache/flink/pull/2016#discussion_r64756634 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -160,12 +168,13 @@ public void setCustomPartitioner(KinesisPartitioner partitioner) { public void open(Configuration parameters) throws Exception { super.open(parameters); - KinesisProducerConfiguration config = new KinesisProducerConfiguration(); - config.setRegion(this.region); - config.setCredentialsProvider(new StaticCredentialsProvider(new BasicAWSCredentials(this.accessKey, this.secretKey))); + KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration(); + + producerConfig.setRegion(configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION)); + producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps)); //config.setCollectionMaxCount(1); //config.setAggregationMaxCount(1); --- End diff -- Sorry again for my late response. Meanwhile I have thought about this and I have come up with a question. I had noticed a remark about not using the KCL library, but is there any special reason why we are not using KPL? Anything we intend to do will probably replicate it including retry mechanism on failures, multi-threading and asynchronous writes. We also need to take into consideration that our producer solution should be cost effective by using the opportunities for batching, which can be customized by the user if low latency is preferred. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/2016#issuecomment-221885858 Sure, no problem at all. Its just for my own time allocation. Thanks for the quick response. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
Github user aozturk commented on the pull request: https://github.com/apache/flink/pull/2016#issuecomment-221885430 Sorry I have had an extra duty for this week, but I have already started working on it and hope to wrap this evening. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/2016#issuecomment-221882616 Just for my planning, when do you think you'll find time to address the remaining issues? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2016#discussion_r64257599 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -160,12 +168,13 @@ public void setCustomPartitioner(KinesisPartitioner partitioner) { public void open(Configuration parameters) throws Exception { super.open(parameters); - KinesisProducerConfiguration config = new KinesisProducerConfiguration(); - config.setRegion(this.region); - config.setCredentialsProvider(new StaticCredentialsProvider(new BasicAWSCredentials(this.accessKey, this.secretKey))); + KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration(); + + producerConfig.setRegion(configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION)); + producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps)); //config.setCollectionMaxCount(1); //config.setAggregationMaxCount(1); --- End diff -- Okay, nice. One thing that would be really helpful would be additional testing. One big issue is that the Kinesis connector doesn't handle "flow control" nicely. If I have a Flink job that is producing data at a higher rate than the number of shards permits, I'm getting a lot of failures. Ideally, the producer should only accept as much data as it can handle and block otherwise. Do you have any ideas how to achieve that? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
Github user aozturk commented on a diff in the pull request: https://github.com/apache/flink/pull/2016#discussion_r64235578 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -160,12 +168,13 @@ public void setCustomPartitioner(KinesisPartitioner partitioner) { public void open(Configuration parameters) throws Exception { super.open(parameters); - KinesisProducerConfiguration config = new KinesisProducerConfiguration(); - config.setRegion(this.region); - config.setCredentialsProvider(new StaticCredentialsProvider(new BasicAWSCredentials(this.accessKey, this.secretKey))); + KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration(); + + producerConfig.setRegion(configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION)); + producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps)); //config.setCollectionMaxCount(1); //config.setAggregationMaxCount(1); --- End diff -- Sure, no need for a new JIRA I can add these keys while addressing your review. I am actually willing to do more work on Kinesis connector. You can freely name things here, or create an issue and assign to me. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/2016#issuecomment-220997800 The change looks good overall. I think we can soon merge it. Please let me know once you've addressed my comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2016#discussion_r64231495 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -160,12 +168,13 @@ public void setCustomPartitioner(KinesisPartitioner partitioner) { public void open(Configuration parameters) throws Exception { super.open(parameters); - KinesisProducerConfiguration config = new KinesisProducerConfiguration(); - config.setRegion(this.region); - config.setCredentialsProvider(new StaticCredentialsProvider(new BasicAWSCredentials(this.accessKey, this.secretKey))); + KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration(); + + producerConfig.setRegion(configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION)); + producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps)); //config.setCollectionMaxCount(1); //config.setAggregationMaxCount(1); --- End diff -- I know this is out of scope for the JIRA, but maybe it would make sense to introduce some producer specific configuration keys for setting relevant producer configuration settings here. Would you like to add this as part of the PR or do you think we should file a follow up JIRA? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2016#discussion_r64230633 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -30,14 +28,20 @@ import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; import java.util.List; import java.util.Objects; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkNotNull; --- End diff -- Actually, just learned that Flink has `org.apache.flink.util.Preconditions` as well --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2016#discussion_r64227104 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -30,14 +28,20 @@ import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; import java.util.List; import java.util.Objects; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkNotNull; --- End diff -- Can you use `Objects.requireNonNull()` instead? We are trying to get rid of Guava in the long term --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
Github user aozturk commented on the pull request: https://github.com/apache/flink/pull/2016#issuecomment-220988206 Sure. Thanks for the review @rmetzger --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/2016#issuecomment-220986366 I noticed that you didn't update the documentation of the Kinesis Producer. Can you update the page to reflect the changed usage. The file is located here: docs/apis/streaming/connectors/kinesis.md --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/2016#issuecomment-220782124 Thanks a lot for your contribution. I quickly scrolled over the changes and I like them at first sight. I assigned the JIRA issue to you. I hope I can review the pull request in the next few days. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
GitHub user aozturk opened a pull request: https://github.com/apache/flink/pull/2016 [FLINK-3923] [connector-kinesis] Unify configuration conventions of t⦠This pull request includes the changes as clearly described in the issue. [FLINK-3923] Unify configuration conventions of the Kinesis producer to the same as the consumer Currently, the Kinesis consumer and producer are configured differently. The producer expects a list of arguments for the access key, secret, region, stream. The consumer is accepting properties (similar to the Kafka connector). The objective of this issue is to change the producer so that it is also using a properties-based configuration (including an input validation step) You can merge this pull request into a Git repository by running: $ git pull https://github.com/aozturk/flink FLINK-3923 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2016.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2016 commit b3f253a016fb3fdd9ecfd1f03ff9313d3f977e3f Author: Abdullah OzturkDate: 2016-05-21T14:38:53Z [FLINK-3923] [connector-kinesis] Unify configuration conventions of the Kinesis producer to the same as the consumer --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---