[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...

2016-05-27 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/2016#issuecomment-55208
  
Okay, I've filed https://issues.apache.org/jira/browse/FLINK-3983. However, 
before you jump onto it: While I wrote the JIRA, I was actually wondering 
whether it makes sense now to address this, if we are going to drop the the KPL 
in the next pull request.
I hope I can make up my mind until Monday.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...

2016-05-27 Thread aozturk
Github user aozturk commented on the pull request:

https://github.com/apache/flink/pull/2016#issuecomment-55079
  
Repeated lines were terrible, it looks neater now. But it seems the if 
checks for max count configurations in FlinkKinesisProducer.java are redundant 
now. 
Adding all configurations sounds good. I thought about it but was not sure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...

2016-05-27 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/2016#issuecomment-52000
  
I have to admit that these changes were not necessary: 
https://github.com/apache/flink/commit/23d1cba72859339bd3ee8f877b031353380c87fb#diff-d34eb8418ccff3e2d59a3e660fed6cec
I wanted to change it so that the code won't throw any exceptions if the 
provided value is not correctly parseable. I didn't see that the verify method 
in the constructor is already doing that.

I'll add a follow up JIRA for adding configuration keys for all relevant 
fields in `KinesisProducerConfiguration`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...

2016-05-27 Thread aozturk
Github user aozturk commented on the pull request:

https://github.com/apache/flink/pull/2016#issuecomment-50998
  
Thanks for the merge! Your latest enhancements look great.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...

2016-05-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2016


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...

2016-05-27 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/2016#issuecomment-222136775
  
Thanks a lot for addressing the comments.
I'll push the changes to travis, once its green I'll merge them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...

2016-05-27 Thread aozturk
Github user aozturk commented on the pull request:

https://github.com/apache/flink/pull/2016#issuecomment-222111542
  
Pushed updates according to your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...

2016-05-26 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2016#discussion_r64767022
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -160,12 +168,13 @@ public void 
setCustomPartitioner(KinesisPartitioner partitioner) {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
 
-   KinesisProducerConfiguration config = new 
KinesisProducerConfiguration();
-   config.setRegion(this.region);
-   config.setCredentialsProvider(new StaticCredentialsProvider(new 
BasicAWSCredentials(this.accessKey, this.secretKey)));
+   KinesisProducerConfiguration producerConfig = new 
KinesisProducerConfiguration();
+
+   
producerConfig.setRegion(configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION));
+   
producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
//config.setCollectionMaxCount(1);
//config.setAggregationMaxCount(1);
--- End diff --

I agree that using the KPL is in general a good idea.
The problem is that the license of the KPL (the Amazon Software License) is 
not compatible with the Apache Software License, therefore, projects at Apache 
can not depend on such code.
We will not release the Flink kinesis connector with the next Flink release 
for that reason (unless we somehow fix this issue).

Are you working at Amazon? If so, can you contact me at robert (at) 
data-artisans.com ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...

2016-05-26 Thread aozturk
Github user aozturk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2016#discussion_r64757778
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -160,12 +168,13 @@ public void 
setCustomPartitioner(KinesisPartitioner partitioner) {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
 
-   KinesisProducerConfiguration config = new 
KinesisProducerConfiguration();
-   config.setRegion(this.region);
-   config.setCredentialsProvider(new StaticCredentialsProvider(new 
BasicAWSCredentials(this.accessKey, this.secretKey)));
+   KinesisProducerConfiguration producerConfig = new 
KinesisProducerConfiguration();
+
+   
producerConfig.setRegion(configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION));
+   
producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
//config.setCollectionMaxCount(1);
//config.setAggregationMaxCount(1);
--- End diff --

Ah sorry I have returned back to the code to be sure and it seems KPL is 
already used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...

2016-05-26 Thread aozturk
Github user aozturk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2016#discussion_r64756634
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -160,12 +168,13 @@ public void 
setCustomPartitioner(KinesisPartitioner partitioner) {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
 
-   KinesisProducerConfiguration config = new 
KinesisProducerConfiguration();
-   config.setRegion(this.region);
-   config.setCredentialsProvider(new StaticCredentialsProvider(new 
BasicAWSCredentials(this.accessKey, this.secretKey)));
+   KinesisProducerConfiguration producerConfig = new 
KinesisProducerConfiguration();
+
+   
producerConfig.setRegion(configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION));
+   
producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
//config.setCollectionMaxCount(1);
//config.setAggregationMaxCount(1);
--- End diff --

Sorry again for my late response. Meanwhile I have thought about this and I 
have come up with a question. I had noticed a remark about not using the KCL 
library, but is there any special reason why we are not using KPL? Anything we 
intend to do will probably replicate it including retry mechanism on failures, 
multi-threading and asynchronous writes.

We also need to take into consideration that our producer solution should 
be cost effective by using the opportunities for batching, which can be 
customized by the user if low latency is preferred.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...

2016-05-26 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/2016#issuecomment-221885858
  
Sure, no problem at all. Its just for my own time allocation. Thanks for 
the quick response.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...

2016-05-26 Thread aozturk
Github user aozturk commented on the pull request:

https://github.com/apache/flink/pull/2016#issuecomment-221885430
  
Sorry I have had an extra duty for this week, but I have already started 
working on it and hope to wrap this evening.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...

2016-05-26 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/2016#issuecomment-221882616
  
Just for my planning, when do you think you'll find time to address the 
remaining issues?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...

2016-05-23 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2016#discussion_r64257599
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -160,12 +168,13 @@ public void 
setCustomPartitioner(KinesisPartitioner partitioner) {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
 
-   KinesisProducerConfiguration config = new 
KinesisProducerConfiguration();
-   config.setRegion(this.region);
-   config.setCredentialsProvider(new StaticCredentialsProvider(new 
BasicAWSCredentials(this.accessKey, this.secretKey)));
+   KinesisProducerConfiguration producerConfig = new 
KinesisProducerConfiguration();
+
+   
producerConfig.setRegion(configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION));
+   
producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
//config.setCollectionMaxCount(1);
//config.setAggregationMaxCount(1);
--- End diff --

Okay, nice.
One thing that would be really helpful would be additional testing. One big 
issue is that the Kinesis connector doesn't handle "flow control" nicely.
If I have a Flink job that is producing data at a higher rate than the 
number of shards permits, I'm getting a lot of failures. Ideally, the producer 
should only accept as much data as it can handle and block otherwise.
Do you have any ideas how to achieve that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...

2016-05-23 Thread aozturk
Github user aozturk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2016#discussion_r64235578
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -160,12 +168,13 @@ public void 
setCustomPartitioner(KinesisPartitioner partitioner) {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
 
-   KinesisProducerConfiguration config = new 
KinesisProducerConfiguration();
-   config.setRegion(this.region);
-   config.setCredentialsProvider(new StaticCredentialsProvider(new 
BasicAWSCredentials(this.accessKey, this.secretKey)));
+   KinesisProducerConfiguration producerConfig = new 
KinesisProducerConfiguration();
+
+   
producerConfig.setRegion(configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION));
+   
producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
//config.setCollectionMaxCount(1);
//config.setAggregationMaxCount(1);
--- End diff --

Sure, no need for a new JIRA I can add these keys while addressing your 
review. I am actually willing to do more work on Kinesis connector. You can 
freely name things here, or create an issue and assign to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...

2016-05-23 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/2016#issuecomment-220997800
  
The change looks good overall. I think we can soon merge it.

Please let me know once you've addressed my comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...

2016-05-23 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2016#discussion_r64231495
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -160,12 +168,13 @@ public void 
setCustomPartitioner(KinesisPartitioner partitioner) {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
 
-   KinesisProducerConfiguration config = new 
KinesisProducerConfiguration();
-   config.setRegion(this.region);
-   config.setCredentialsProvider(new StaticCredentialsProvider(new 
BasicAWSCredentials(this.accessKey, this.secretKey)));
+   KinesisProducerConfiguration producerConfig = new 
KinesisProducerConfiguration();
+
+   
producerConfig.setRegion(configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION));
+   
producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
//config.setCollectionMaxCount(1);
//config.setAggregationMaxCount(1);
--- End diff --

I know this is out of scope for the JIRA, but maybe it would make sense to 
introduce some producer specific configuration keys for setting relevant 
producer configuration settings here.

Would you like to add this as part of the PR or do you think we should file 
a follow up JIRA?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...

2016-05-23 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2016#discussion_r64230633
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -30,14 +28,20 @@
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import 
org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Objects;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkNotNull;
--- End diff --

Actually, just learned that Flink has `org.apache.flink.util.Preconditions` 
as well


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...

2016-05-23 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2016#discussion_r64227104
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -30,14 +28,20 @@
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import 
org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Objects;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkNotNull;
--- End diff --

Can you use `Objects.requireNonNull()` instead?

We are trying to get rid of Guava in the long term


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...

2016-05-23 Thread aozturk
Github user aozturk commented on the pull request:

https://github.com/apache/flink/pull/2016#issuecomment-220988206
  
Sure. Thanks for the review @rmetzger


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...

2016-05-23 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/2016#issuecomment-220986366
  
I noticed that you didn't update the documentation of the Kinesis Producer.
Can you update the page to reflect the changed usage.
The file is located here: docs/apis/streaming/connectors/kinesis.md


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...

2016-05-21 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/2016#issuecomment-220782124
  
Thanks a lot for your contribution. I quickly scrolled over the changes and 
I like them at first sight.

I assigned the JIRA issue to you. I hope I can review the pull request in 
the next few days.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3923] [connector-kinesis] Unify configu...

2016-05-21 Thread aozturk
GitHub user aozturk opened a pull request:

https://github.com/apache/flink/pull/2016

[FLINK-3923] [connector-kinesis] Unify configuration conventions of t…

This pull request includes the changes as clearly described in the issue.

[FLINK-3923] Unify configuration conventions of the Kinesis producer to the 
same as the consumer
Currently, the Kinesis consumer and producer are configured differently.
The producer expects a list of arguments for the access key, secret, 
region, stream. The consumer is accepting properties (similar to the Kafka 
connector).
The objective of this issue is to change the producer so that it is also 
using a properties-based configuration (including an input validation step)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aozturk/flink FLINK-3923

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2016.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2016


commit b3f253a016fb3fdd9ecfd1f03ff9313d3f977e3f
Author: Abdullah Ozturk 
Date:   2016-05-21T14:38:53Z

[FLINK-3923] [connector-kinesis] Unify configuration conventions of the 
Kinesis producer to the same as the consumer




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---