[GitHub] flink pull request #4228: Flink-7035 Automatically identify AWS Region, simp...

2017-07-04 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4228#discussion_r125445183
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -117,12 +117,77 @@
// 

//  Constructors
// 

+   /**
+* Creates a new Flink Kinesis Consumer that uses the AWS credentials 
and region provided from the AWS
+* node with the stream initial position to LATEST.
+*
+* @param stream
+*   The single AWS Kinesis stream to read from.
+* @param deserializer
+*   The deserializer used to convert raw bytes of Kinesis 
records to Java objects (without key).
+*/
+   public FlinkKinesisConsumer(String stream, DeserializationSchema 
deserializer) {
+   this(stream, new 
KinesisDeserializationSchemaWrapper<>(deserializer));
+   }
+
+   /**
+* Creates a new Flink Kinesis Consumer that uses the AWS credentials 
and region provided from the AWS
+* node with the provided initial position in the stream.
+*
+* @param stream
+* @param deserializer
+* @param initialPosition
--- End diff --

These params need descriptions for the Javadoc also.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4228: Flink-7035 Automatically identify AWS Region, simp...

2017-07-04 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4228#discussion_r125445109
  
--- Diff: docs/dev/connectors/kinesis.md ---
@@ -72,12 +72,80 @@ Before consuming data from Kinesis streams, make sure 
that all streams are creat
 
 
 {% highlight java %}
-Properties consumerConfig = new Properties();
-consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getEnvironment();
+
+DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>(
+"kinesis_stream_name", new SimpleStringSchema(), 
ConsumerConfigConstants.InitialPosition.LATEST));
+{% endhighlight %}
+
+
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getEnvironment
+
+val kinesis = env.addSource(new FlinkKinesisConsumer[String](
+"kinesis_stream_name", new SimpleStringSchema, 
ConsumerConfigConstants.InitialPosition.LATEST))
+{% endhighlight %}
+
+
+
+The above is a simple example of using the Kinesis consumer when running 
on an Amazon Linux node (such as in EMR or AWS Lambda).
+The AWS APIs automatically provide the authentication credentials and 
region when available.  For unit testing, the ability to
+set test configuration is provided using KinesisConfigUtil.
+
+
+
+{% highlight java %}
+Properties testConfig = new Properties();
+testConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+testConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"aws_access_key_id");
+testConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key");
+testConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
+
+KinesisConfigUtil.setDefaultTestProperties(testConfig);
+
+// Automatically uses testConfig without having to modify job flow
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getEnvironment();
+DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>(
+"kinesis_stream_name", new SimpleStringSchema()));
+{% endhighlight %}
+
+
+{% highlight scala %}
+val testConfig = new Properties();
+testConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1");
 consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"aws_access_key_id");
 consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key");
 consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
"LATEST");
 
+KinesisConfigUtil.setDefaultTestProperties(testConfig);
+
+// Automatically uses testConfig without having to modify job flow
+val env = StreamExecutionEnvironment.getEnvironment
+val kinesis = env.addSource(new FlinkKinesisConsumer[String](
+"kinesis_stream_name", new SimpleStringSchema))
+{% endhighlight %}
+
+
+
+Configuration for the consumer can also be supplied with 
`java.util.Properties` for use on non-Amazon Linux hardware,
+or in the case that other stream consumer properties need to be tuned.
+
+Please note it is strongly recommended to use Kinesis streams within the 
same availability zone they originate in.
--- End diff --

Thanks for the explanation! That makes a lot of sense.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4228: Flink-7035 Automatically identify AWS Region, simp...

2017-07-04 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4228#discussion_r125445000
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -117,12 +117,77 @@
// 

//  Constructors
// 

+   /**
+* Creates a new Flink Kinesis Consumer that uses the AWS credentials 
and region provided from the AWS
+* node with the stream initial position to LATEST.
+*
+* @param stream
+*   The single AWS Kinesis stream to read from.
+* @param deserializer
+*   The deserializer used to convert raw bytes of Kinesis 
records to Java objects (without key).
+*/
+   public FlinkKinesisConsumer(String stream, DeserializationSchema 
deserializer) {
+   this(stream, new 
KinesisDeserializationSchemaWrapper<>(deserializer));
+   }
+
+   /**
+* Creates a new Flink Kinesis Consumer that uses the AWS credentials 
and region provided from the AWS
+* node with the provided initial position in the stream.
+*
+* @param stream
+* @param deserializer
+* @param initialPosition
+*/
+   public FlinkKinesisConsumer(String stream, DeserializationSchema 
deserializer, InitialPosition initialPosition) {
+   this(stream, new 
KinesisDeserializationSchemaWrapper<>(deserializer), initialPosition);
+   }
+
+   /**
+* Creates a new Flink Kinesis Consumer that uses the AWS credentials 
and region provided from the AWS
+* node with the stream initial position to LATEST.
+*
+* @param stream
+*   The single AWS Kinesis stream to read from.
+* @param deserializer
+*   The deserializer used to convert raw bytes of Kinesis 
records to Java objects (without key).
+*/
+   public FlinkKinesisConsumer(String stream, 
KinesisDeserializationSchema deserializer) {
+   this(stream, deserializer, getDefaultConfigProperties());
+   }
+
+   /**
+* Creates a new Flink Kinesis Consumer that uses the AWS credentials 
and region provided from the AWS
+* node with the provided initial position in the stream.
+*
+* @param stream
+*   The single AWS Kinesis stream to read from.
+* @param deserializer
+*   The deserializer used to convert raw bytes of Kinesis 
records to Java objects (without key).
+* @param initialPosition
+*   Where to start the Kinesis stream
+*/
+   public FlinkKinesisConsumer(String stream, 
KinesisDeserializationSchema deserializer, InitialPosition initialPosition) {
+   this(stream, deserializer, 
getDefaultPropsWithInitialPosition(initialPosition));
+   }
+
+   private static Properties 
getDefaultPropsWithInitialPosition(InitialPosition initialPosition) {
+   final Properties conf = getDefaultConfigProperties();
+   
conf.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
initialPosition.name());
--- End diff --

This properties building is problematic. For example, we also have an 
`InitialPosition.AT_TIMESTAMP` configuration, which involves 2 keys in the 
properties.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4228: Flink-7035 Automatically identify AWS Region, simp...

2017-07-04 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4228#discussion_r125445059
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -117,12 +117,77 @@
// 

//  Constructors
// 

+   /**
--- End diff --

nit: add empty line before /**


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4228: Flink-7035 Automatically identify AWS Region, simp...

2017-06-30 Thread mpouttuclarke
Github user mpouttuclarke commented on a diff in the pull request:

https://github.com/apache/flink/pull/4228#discussion_r125076287
  
--- Diff: docs/dev/connectors/kinesis.md ---
@@ -72,12 +72,80 @@ Before consuming data from Kinesis streams, make sure 
that all streams are creat
 
 
 {% highlight java %}
-Properties consumerConfig = new Properties();
-consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getEnvironment();
+
+DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>(
+"kinesis_stream_name", new SimpleStringSchema(), 
ConsumerConfigConstants.InitialPosition.LATEST));
+{% endhighlight %}
+
+
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getEnvironment
+
+val kinesis = env.addSource(new FlinkKinesisConsumer[String](
+"kinesis_stream_name", new SimpleStringSchema, 
ConsumerConfigConstants.InitialPosition.LATEST))
+{% endhighlight %}
+
+
+
+The above is a simple example of using the Kinesis consumer when running 
on an Amazon Linux node (such as in EMR or AWS Lambda).
+The AWS APIs automatically provide the authentication credentials and 
region when available.  For unit testing, the ability to
+set test configuration is provided using KinesisConfigUtil.
+
+
+
+{% highlight java %}
+Properties testConfig = new Properties();
+testConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+testConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"aws_access_key_id");
+testConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key");
+testConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
+
+KinesisConfigUtil.setDefaultTestProperties(testConfig);
+
+// Automatically uses testConfig without having to modify job flow
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getEnvironment();
+DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>(
+"kinesis_stream_name", new SimpleStringSchema()));
+{% endhighlight %}
+
+
+{% highlight scala %}
+val testConfig = new Properties();
+testConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1");
 consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"aws_access_key_id");
 consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key");
 consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
"LATEST");
 
+KinesisConfigUtil.setDefaultTestProperties(testConfig);
+
+// Automatically uses testConfig without having to modify job flow
+val env = StreamExecutionEnvironment.getEnvironment
+val kinesis = env.addSource(new FlinkKinesisConsumer[String](
+"kinesis_stream_name", new SimpleStringSchema))
+{% endhighlight %}
+
+
+
+Configuration for the consumer can also be supplied with 
`java.util.Properties` for use on non-Amazon Linux hardware,
+or in the case that other stream consumer properties need to be tuned.
+
+Please note it is strongly recommended to use Kinesis streams within the 
same availability zone they originate in.
--- End diff --

We completely restrict cross-region network traffic except in special 
circumstances within Amazon because of these reasons.  These are simply lessons 
learned from scaling our systems globally, where situations arise not only due 
to performance concerns but also regulatory issues such as EU data laws 
restricting data egress for example.  Customer should be aware of these issues 
and discuss them with support before going down this path.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4228: Flink-7035 Automatically identify AWS Region, simp...

2017-06-30 Thread mpouttuclarke
Github user mpouttuclarke commented on a diff in the pull request:

https://github.com/apache/flink/pull/4228#discussion_r125074194
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
 ---
@@ -171,7 +172,14 @@ public static void validateAwsConfiguration(Properties 
config) {
}
 
if (!config.containsKey(AWSConfigConstants.AWS_REGION)) {
-   throw new IllegalArgumentException("The AWS region ('" 
+ AWSConfigConstants.AWS_REGION + "') must be set in the config.");
+   final Region currentRegion = Regions.getCurrentRegion();
+   if (currentRegion != null) {
+   
config.setProperty(AWSConfigConstants.AWS_REGION, currentRegion.getName());
+   } else {
+   throw new IllegalArgumentException("The AWS 
region could not be identified automatically from the AWS API.  " +
--- End diff --

Yes I like that wording.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4228: Flink-7035 Automatically identify AWS Region, simp...

2017-06-30 Thread mpouttuclarke
Github user mpouttuclarke commented on a diff in the pull request:

https://github.com/apache/flink/pull/4228#discussion_r125073534
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
 ---
@@ -171,7 +172,14 @@ public static void validateAwsConfiguration(Properties 
config) {
}
 
if (!config.containsKey(AWSConfigConstants.AWS_REGION)) {
--- End diff --

The new constructors make the easy path the right path.  We go through a 
lot of trouble at Amazon to make sure that the default constructors do the 
right thing with the minimal amount of effort.  Yet people still set things 
like region and auth manually when it is not only unnecessary but also a 
security, performance, and compliance risk.  Wherever we can we should try to 
follow the example of the AWS SDK and provide for using it correctly.  Overall, 
I would make the argument that using property files and statics isn't a best 
practice.  There really should be type safe POJOs and dependency injection in 
place for configuration of the consumer but that is a larger issue than I can 
take on right now.  The new constructors attempt to add some type safety while 
improving ease of use when operating in an Amazon environment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4228: Flink-7035 Automatically identify AWS Region, simp...

2017-06-29 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4228#discussion_r124967289
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
 ---
@@ -171,7 +172,14 @@ public static void validateAwsConfiguration(Properties 
config) {
}
 
if (!config.containsKey(AWSConfigConstants.AWS_REGION)) {
--- End diff --

I wonder if its actually sufficient to rely on this check, without having 
the need to have a bunch of the new constructors that you added.

I.e., it is sufficient that the user doesn't supply a 
`AWSConfigConstants.AWS_REGION` value in the props, which we would then default 
to using `Regions.getCurrentRegion()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4228: Flink-7035 Automatically identify AWS Region, simp...

2017-06-29 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4228#discussion_r124967667
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
 ---
@@ -171,7 +172,14 @@ public static void validateAwsConfiguration(Properties 
config) {
}
 
if (!config.containsKey(AWSConfigConstants.AWS_REGION)) {
-   throw new IllegalArgumentException("The AWS region ('" 
+ AWSConfigConstants.AWS_REGION + "') must be set in the config.");
+   final Region currentRegion = Regions.getCurrentRegion();
+   if (currentRegion != null) {
+   
config.setProperty(AWSConfigConstants.AWS_REGION, currentRegion.getName());
+   } else {
+   throw new IllegalArgumentException("The AWS 
region could not be identified automatically from the AWS API.  " +
--- End diff --

I would rephrase this:

>The AWS region could not be identified automatically from the environment. 
 Automatic AWS region retrieval is only available when the consumer is run in 
AWS environments, which is recommended for production usage to avoid 
cross-region network traffic. When executed in non-AWS environments, please set 
the region manually using the property AWSConfigConstants.AWS_REGION.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4228: Flink-7035 Automatically identify AWS Region, simp...

2017-06-29 Thread mpouttuclarke
GitHub user mpouttuclarke opened a pull request:

https://github.com/apache/flink/pull/4228

Flink-7035 Automatically identify AWS Region, simplified constructors, 
added test properties support



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mpouttuclarke/flink FLINK-7035

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4228.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4228






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---