[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
Github user aozturk commented on the pull request: https://github.com/apache/flink/pull/2016#issuecomment-55079 Repeated lines were terrible, it looks neater now. But it seems the if checks for max count configurations in FlinkKinesisProducer.java are redundant now. Adding all configurations sounds good. I thought about it but was not sure. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
Github user aozturk commented on the pull request: https://github.com/apache/flink/pull/2016#issuecomment-50998 Thanks for the merge! Your latest enhancements look great. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
Github user aozturk commented on the pull request: https://github.com/apache/flink/pull/2016#issuecomment-222111542 Pushed updates according to your comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
Github user aozturk commented on a diff in the pull request: https://github.com/apache/flink/pull/2016#discussion_r64757778 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -160,12 +168,13 @@ public void setCustomPartitioner(KinesisPartitioner partitioner) { public void open(Configuration parameters) throws Exception { super.open(parameters); - KinesisProducerConfiguration config = new KinesisProducerConfiguration(); - config.setRegion(this.region); - config.setCredentialsProvider(new StaticCredentialsProvider(new BasicAWSCredentials(this.accessKey, this.secretKey))); + KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration(); + + producerConfig.setRegion(configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION)); + producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps)); //config.setCollectionMaxCount(1); //config.setAggregationMaxCount(1); --- End diff -- Ah sorry I have returned back to the code to be sure and it seems KPL is already used. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
Github user aozturk commented on a diff in the pull request: https://github.com/apache/flink/pull/2016#discussion_r64756634 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -160,12 +168,13 @@ public void setCustomPartitioner(KinesisPartitioner partitioner) { public void open(Configuration parameters) throws Exception { super.open(parameters); - KinesisProducerConfiguration config = new KinesisProducerConfiguration(); - config.setRegion(this.region); - config.setCredentialsProvider(new StaticCredentialsProvider(new BasicAWSCredentials(this.accessKey, this.secretKey))); + KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration(); + + producerConfig.setRegion(configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION)); + producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps)); //config.setCollectionMaxCount(1); //config.setAggregationMaxCount(1); --- End diff -- Sorry again for my late response. Meanwhile I have thought about this and I have come up with a question. I had noticed a remark about not using the KCL library, but is there any special reason why we are not using KPL? Anything we intend to do will probably replicate it including retry mechanism on failures, multi-threading and asynchronous writes. We also need to take into consideration that our producer solution should be cost effective by using the opportunities for batching, which can be customized by the user if low latency is preferred. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
Github user aozturk commented on the pull request: https://github.com/apache/flink/pull/2016#issuecomment-221885430 Sorry I have had an extra duty for this week, but I have already started working on it and hope to wrap this evening. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
Github user aozturk commented on a diff in the pull request: https://github.com/apache/flink/pull/2016#discussion_r64235578 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -160,12 +168,13 @@ public void setCustomPartitioner(KinesisPartitioner partitioner) { public void open(Configuration parameters) throws Exception { super.open(parameters); - KinesisProducerConfiguration config = new KinesisProducerConfiguration(); - config.setRegion(this.region); - config.setCredentialsProvider(new StaticCredentialsProvider(new BasicAWSCredentials(this.accessKey, this.secretKey))); + KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration(); + + producerConfig.setRegion(configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION)); + producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps)); //config.setCollectionMaxCount(1); //config.setAggregationMaxCount(1); --- End diff -- Sure, no need for a new JIRA I can add these keys while addressing your review. I am actually willing to do more work on Kinesis connector. You can freely name things here, or create an issue and assign to me. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
Github user aozturk commented on the pull request: https://github.com/apache/flink/pull/2016#issuecomment-220988206 Sure. Thanks for the review @rmetzger --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...
GitHub user aozturk opened a pull request: https://github.com/apache/flink/pull/2016 [FLINK-3923] [connector-kinesis] Unify configuration conventions of t⦠This pull request includes the changes as clearly described in the issue. [FLINK-3923] Unify configuration conventions of the Kinesis producer to the same as the consumer Currently, the Kinesis consumer and producer are configured differently. The producer expects a list of arguments for the access key, secret, region, stream. The consumer is accepting properties (similar to the Kafka connector). The objective of this issue is to change the producer so that it is also using a properties-based configuration (including an input validation step) You can merge this pull request into a Git repository by running: $ git pull https://github.com/aozturk/flink FLINK-3923 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2016.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2016 commit b3f253a016fb3fdd9ecfd1f03ff9313d3f977e3f Author: Abdullah Ozturk <oztu...@amazon.com> Date: 2016-05-21T14:38:53Z [FLINK-3923] [connector-kinesis] Unify configuration conventions of the Kinesis producer to the same as the consumer --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---