[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249208#comment-15249208
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60346774
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.LimitExceededException;
+import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
+import com.amazonaws.services.kinesis.model.StreamStatus;
+import com.amazonaws.services.kinesis.model.Shard;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A utility class that is used as a proxy to make calls to AWS Kinesis
+ * for several functions, such as getting a list of shards and fetching
+ * a batch of data records starting from a specified record sequence 
number.
+ */
+public class KinesisProxy {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KinesisProxy.class);
+
+   /** The actual Kinesis client from the AWS SDK that we will be using to 
make calls */
+   private final AmazonKinesisClient kinesisClient;
+
+   /** The AWS region that this proxy will be making calls to */
+   private final String regionId;
+
+   /** Configuration properties of this Flink Kinesis Connector */
+   private final Properties configProps;
+
+   /**
+* Create a new KinesisProxy based on the supplied configuration 
properties
+*
+* @param configProps configuration properties containing AWS 
credential and AWS region info
+*/
+   public KinesisProxy(Properties configProps) {
+   this.configProps = checkNotNull(configProps);
+
+   this.regionId = 
configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION, 
KinesisConfigConstants.DEFAULT_AWS_REGION);
+   AmazonKinesisClient client = new 
AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials());
+   
client.setRegion(Region.getRegion(Regions.fromName(this.regionId)));
--- End diff --

I'm using the "ap-northeast-1" region, which isn't the default.
Setting the region on the AmazonKinesisClient should set the endpoint too, 
no?


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source 

[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...

2016-04-19 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60346774
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.LimitExceededException;
+import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
+import com.amazonaws.services.kinesis.model.StreamStatus;
+import com.amazonaws.services.kinesis.model.Shard;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A utility class that is used as a proxy to make calls to AWS Kinesis
+ * for several functions, such as getting a list of shards and fetching
+ * a batch of data records starting from a specified record sequence 
number.
+ */
+public class KinesisProxy {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KinesisProxy.class);
+
+   /** The actual Kinesis client from the AWS SDK that we will be using to 
make calls */
+   private final AmazonKinesisClient kinesisClient;
+
+   /** The AWS region that this proxy will be making calls to */
+   private final String regionId;
+
+   /** Configuration properties of this Flink Kinesis Connector */
+   private final Properties configProps;
+
+   /**
+* Create a new KinesisProxy based on the supplied configuration 
properties
+*
+* @param configProps configuration properties containing AWS 
credential and AWS region info
+*/
+   public KinesisProxy(Properties configProps) {
+   this.configProps = checkNotNull(configProps);
+
+   this.regionId = 
configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION, 
KinesisConfigConstants.DEFAULT_AWS_REGION);
+   AmazonKinesisClient client = new 
AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials());
+   
client.setRegion(Region.getRegion(Regions.fromName(this.regionId)));
--- End diff --

I'm using the "ap-northeast-1" region, which isn't the default.
Setting the region on the AmazonKinesisClient should set the endpoint too, 
no?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Flink 1827 and small fixes in some tests

2016-04-19 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1915#issuecomment-212076850
  
I quickly scrolled over the changed, and overall I like the idea of fixing 
the thing with the test utils.
I wonder if we could get rid of the "flink-test-utils" entirely by moving 
the contents into the "flink-tests" project. Have you considered that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3768) Clustering Coefficient

2016-04-19 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15248425#comment-15248425
 ] 

Greg Hogan commented on FLINK-3768:
---

The global clustering coefficient is simple to compute but fitting this all 
together is the challenge. From the Wikipedia page, the global CC is {{3 * 
number of triangles / number of connected triplets}}. Triplets can be counted 
by summing over {{(vertex degree choose 2)}} and counting triangles already 
uses vertex degree.

We can store the triangle count in an accumulator. We can write a graph 
algorithm to read vertex degrees and store the triplet count in an accumulator. 
And we can create a class which will set this up and then after the user has 
executed the program will retrieve the accumulator values and compute the score.

> Clustering Coefficient
> --
>
> Key: FLINK-3768
> URL: https://issues.apache.org/jira/browse/FLINK-3768
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.1.0
>
>
> The local clustering coefficient measures the connectedness of each vertex's 
> neighborhood. Values range from 0.0 (no edges between neighbors) to 1.0 
> (neighborhood is a clique).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request:

2016-04-19 Thread uce
Github user uce commented on the pull request:


https://github.com/apache/flink/commit/7462a5bfd7cb2dafbbc9eb02a43d3db9f6add30e#commitcomment-17165776
  
Can you push this to `release-1.0` as well to have it in 1.0.3 or the next 
1.0.2 RC?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3780) Jaccard Similarity

2016-04-19 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15248270#comment-15248270
 ] 

Greg Hogan commented on FLINK-3780:
---

The Jaccard Similarity is computed on two vertices. For a graph with {{n}} 
vertices there are {{(n choose 2)}} similarity scores which is {{O(n^2)}}. 
Also, lack of similarity (here, no shared neighbors) is usually not very 
interesting.

> Jaccard Similarity
> --
>
> Key: FLINK-3780
> URL: https://issues.apache.org/jira/browse/FLINK-3780
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>
> Implement a Jaccard Similarity algorithm computing all non-zero similarity 
> scores. This algorithm is similar to {{TriangleListing}} but instead of 
> joining two-paths against an edge list we count two-paths.
> {{flink-gelly-examples}} currently has {{JaccardSimilarityMeasure}} which 
> relies on {{Graph.getTriplets()}} so only computes similarity scores for 
> neighbors but not neighbors-of-neighbors.
> This algorithm is easily modified for other similarity scores such as 
> Adamic-Adar similarity where the sum of endpoint degrees is replaced by the 
> degree of the middle vertex.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3789) Overload methods which trigger program execution to allow naming job

2016-04-19 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-3789:
--
Description: 
Overload the following functions to additionally accept a job name to pass to 
{{ExecutionEnvironment.execute(String)}}.
* {{DataSet.collect()}}
* {{DataSet.count()}}
* {{DataSetUtils.checksumHashCode(DataSet)}}
* {{GraphUtils.checksumHashCode(Graph)}}

Once the deprecated {{DataSet.print(String)}} and 
{{DataSet.printToErr(String)}} are removed we can overload {{DataSet.print()}}.

  was:
Overload the following functions to additionally accept a job name to pass to 
{{ExecutionEnvironment.execute(String)}}.
* {{DataSet.collect()}}
* {{DataSet.count()}}
* {{DataSetUtils.checksumHashCode(DataSet)}}
* {{GraphUtils.checksumHashCode(Graph)}}

Once the deprecated {{DataSet.print(String)}} and {{DataSet.printToErr()}} are 
removed we can overload {{DataSet.print()}}.


> Overload methods which trigger program execution to allow naming job
> 
>
> Key: FLINK-3789
> URL: https://issues.apache.org/jira/browse/FLINK-3789
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> Overload the following functions to additionally accept a job name to pass to 
> {{ExecutionEnvironment.execute(String)}}.
> * {{DataSet.collect()}}
> * {{DataSet.count()}}
> * {{DataSetUtils.checksumHashCode(DataSet)}}
> * {{GraphUtils.checksumHashCode(Graph)}}
> Once the deprecated {{DataSet.print(String)}} and 
> {{DataSet.printToErr(String)}} are removed we can overload 
> {{DataSet.print()}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3789) Overload methods which trigger program execution to allow naming job

2016-04-19 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-3789:
--
Description: 
Overload the following functions to additionally accept a job name to pass to 
{{ExecutionEnvironment.execute(String)}}.
* {{DataSet.collect()}}
* {{DataSet.count()}}
* {{DataSetUtils.checksumHashCode(DataSet)}}
* {{GraphUtils.checksumHashCode(Graph)}}

Once the deprecated {{DataSet.print(String)}} and {{DataSet.printToErr()}} are 
removed we can overload {{DataSet.print()}}.

  was:
Overload the following functions to additionally accept a job name to pass to 
{{ExecutionEnvironment.execute(String)}}.
* {{DataSet.collect()}}
* {{DataSet.count()}}
* {{DataSet.print()}}
* {{DataSetUtils.checksumHashCode(DataSet)}}
* {{GraphUtils.checksumHashCode(Graph)}}


> Overload methods which trigger program execution to allow naming job
> 
>
> Key: FLINK-3789
> URL: https://issues.apache.org/jira/browse/FLINK-3789
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> Overload the following functions to additionally accept a job name to pass to 
> {{ExecutionEnvironment.execute(String)}}.
> * {{DataSet.collect()}}
> * {{DataSet.count()}}
> * {{DataSetUtils.checksumHashCode(DataSet)}}
> * {{GraphUtils.checksumHashCode(Graph)}}
> Once the deprecated {{DataSet.print(String)}} and {{DataSet.printToErr()}} 
> are removed we can overload {{DataSet.print()}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3780) Jaccard Similarity

2016-04-19 Thread Vasia Kalavri (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15248193#comment-15248193
 ] 

Vasia Kalavri commented on FLINK-3780:
--

Hi [~greghogan],
what do you mean by "all non-zero" similarity scores? Is the objective here to 
compute the jaccard similarity of all vertices against all other vertices in 
the input graph?

> Jaccard Similarity
> --
>
> Key: FLINK-3780
> URL: https://issues.apache.org/jira/browse/FLINK-3780
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>
> Implement a Jaccard Similarity algorithm computing all non-zero similarity 
> scores. This algorithm is similar to {{TriangleListing}} but instead of 
> joining two-paths against an edge list we count two-paths.
> {{flink-gelly-examples}} currently has {{JaccardSimilarityMeasure}} which 
> relies on {{Graph.getTriplets()}} so only computes similarity scores for 
> neighbors but not neighbors-of-neighbors.
> This algorithm is easily modified for other similarity scores such as 
> Adamic-Adar similarity where the sum of endpoint degrees is replaced by the 
> degree of the middle vertex.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-1528) Add local clustering coefficient library method and example

2016-04-19 Thread Vasia Kalavri (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vasia Kalavri closed FLINK-1528.


> Add local clustering coefficient library method and example
> ---
>
> Key: FLINK-1528
> URL: https://issues.apache.org/jira/browse/FLINK-1528
> Project: Flink
>  Issue Type: Task
>  Components: Gelly
>Reporter: Vasia Kalavri
>Assignee: Vasia Kalavri
>
> Add a gelly library method to compute the local clustering coefficient.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-1528) Add local clustering coefficient library method and example

2016-04-19 Thread Vasia Kalavri (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vasia Kalavri resolved FLINK-1528.
--
Resolution: Fixed

> Add local clustering coefficient library method and example
> ---
>
> Key: FLINK-1528
> URL: https://issues.apache.org/jira/browse/FLINK-1528
> Project: Flink
>  Issue Type: Task
>  Components: Gelly
>Reporter: Vasia Kalavri
>Assignee: Vasia Kalavri
>
> Add a gelly library method to compute the local clustering coefficient.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3768) Clustering Coefficient

2016-04-19 Thread Vasia Kalavri (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15248180#comment-15248180
 ] 

Vasia Kalavri commented on FLINK-3768:
--

Thanks! It makes sense to proceed with this implementation. I'll mark the other 
issue as a duplicate.

> Clustering Coefficient
> --
>
> Key: FLINK-3768
> URL: https://issues.apache.org/jira/browse/FLINK-3768
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.1.0
>
>
> The local clustering coefficient measures the connectedness of each vertex's 
> neighborhood. Values range from 0.0 (no edges between neighbors) to 1.0 
> (neighborhood is a clique).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3781] BlobClient may be left unclosed i...

2016-04-19 Thread gaoyike
Github user gaoyike commented on the pull request:

https://github.com/apache/flink/pull/1908#issuecomment-212011148
  
Updated!

Thanks uce!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3781) BlobClient may be left unclosed in BlobCache#deleteGlobal()

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15248150#comment-15248150
 ] 

ASF GitHub Bot commented on FLINK-3781:
---

Github user gaoyike commented on the pull request:

https://github.com/apache/flink/pull/1908#issuecomment-212011148
  
Updated!

Thanks uce!


> BlobClient may be left unclosed in BlobCache#deleteGlobal()
> ---
>
> Key: FLINK-3781
> URL: https://issues.apache.org/jira/browse/FLINK-3781
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   public void deleteGlobal(BlobKey key) throws IOException {
> delete(key);
> BlobClient bc = createClient();
> bc.delete(key);
> bc.close();
> {code}
> If delete() throws IOException, BlobClient would be left inclosed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Flink 1827 and small fixes in some tests

2016-04-19 Thread stefanobortoli
GitHub user stefanobortoli opened a pull request:

https://github.com/apache/flink/pull/1915

Flink 1827 and small fixes in some tests

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [X] General
  - The pull request references the related JIRA issue
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message

- [X] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [X] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/stefanobortoli/flink FLINK-1827

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1915.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1915


commit 88483205c57884303c9e46159df9d89aa953b547
Author: okkam 
Date:   2016-04-19T09:50:59Z

FLINK-1827 fixed compilation that skips tests

commit 5a5cc98ba4fc407fe0fc18d7a57dc4455ce80001
Author: okkam 
Date:   2016-04-19T09:50:59Z

FLINK-1827 fixed compilation that skips tests

commit 85b1763ea84a59258a3a4fef87d05650e99b5be0
Author: okkam 
Date:   2016-04-19T09:58:13Z

Merge remote-tracking branch 'origin/FLINK-1827' into FLINK-1827

Conflicts:
flink-test-utils/pom.xml

commit 310b4e901283fde317dc97c9d454341d331f5d04
Author: okkam 
Date:   2016-04-19T16:37:55Z

FLINK-1827 and improved tests removing references to localized messages




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-3789) Overload methods which trigger program execution to allow naming job

2016-04-19 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-3789:
--
Description: 
Overload the following functions to additionally accept a job name to pass to 
{{ExecutionEnvironment.execute(String)}}.
* {{DataSet.collect()}}
* {{DataSet.count()}}
* {{DataSet.print()}}
* {{DataSetUtils.checksumHashCode(DataSet)}}
* {{GraphUtils.checksumHashCode(Graph)}}

  was:
Overload the following functions to additionally accept a job name to pass to 
{{ExecutionEnvironment.execute(String)}}.
* {{DataSet.collect()}}
* {{DataSet.count()}}
* {{DataSetUtils.checksumHashCode(DataSet)}}
* {{GraphUtils.checksumHashCode(Graph)}}


> Overload methods which trigger program execution to allow naming job
> 
>
> Key: FLINK-3789
> URL: https://issues.apache.org/jira/browse/FLINK-3789
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> Overload the following functions to additionally accept a job name to pass to 
> {{ExecutionEnvironment.execute(String)}}.
> * {{DataSet.collect()}}
> * {{DataSet.count()}}
> * {{DataSet.print()}}
> * {{DataSetUtils.checksumHashCode(DataSet)}}
> * {{GraphUtils.checksumHashCode(Graph)}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3788) Local variable values are not distributed to job runners

2016-04-19 Thread Stefano Baghino (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247924#comment-15247924
 ] 

Stefano Baghino commented on FLINK-3788:


Yes, I experimented this very problem when trying Flink for the first time. I 
believe it depends on how the {{App}} trait is implemented in Scala 
([DelayedInit 
Scaladoc|http://www.scala-lang.org/api/current/#scala.DelayedInit]), perhaps 
this won't be an issue with Scala 2.12+.

> Local variable values are not distributed to job runners
> 
>
> Key: FLINK-3788
> URL: https://issues.apache.org/jira/browse/FLINK-3788
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API
>Affects Versions: 1.0.0, 1.0.1
> Environment: Scala 2.11.8
> Sun JDK 1.8.0_65 or OpenJDK 1.8.0_77
> Fedora 25, 4.6.0-0.rc2.git3.1.fc25.x86_64
>Reporter: Andreas C. Osowski
> Attachments: FLINK-3788.tgz
>
>
> Variable values of non-elementary types aren't caught and distributed to job 
> runners, causing them to remain 'null' and causing NPEs upon access when 
> running on a cluster. Running locally through `flink-clients` works fine.
> Changing parallelism or disabling the closure cleaner don't seem to have any 
> effect.
> Minimal example, also see the attached archive.
> {code:java}
> case class IntWrapper(a1: Int)
> val wrapped = IntWrapper(42)
> env.readTextFile("myTextFile.txt").map(line => wrapped.toString).collect
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3788) Local variable values are not distributed to job runners

2016-04-19 Thread Andreas C. Osowski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247919#comment-15247919
 ] 

Andreas C. Osowski commented on FLINK-3788:
---

In that case, the behaviour can probably be explained by: 

bq. It should be noted that this trait is implemented using the DelayedInit 
functionality, which means that fields of the object will not have been 
initialized before the main method has been executed.
Source: http://www.scala-lang.org/api/current/index.html#scala.App


> Local variable values are not distributed to job runners
> 
>
> Key: FLINK-3788
> URL: https://issues.apache.org/jira/browse/FLINK-3788
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API
>Affects Versions: 1.0.0, 1.0.1
> Environment: Scala 2.11.8
> Sun JDK 1.8.0_65 or OpenJDK 1.8.0_77
> Fedora 25, 4.6.0-0.rc2.git3.1.fc25.x86_64
>Reporter: Andreas C. Osowski
> Attachments: FLINK-3788.tgz
>
>
> Variable values of non-elementary types aren't caught and distributed to job 
> runners, causing them to remain 'null' and causing NPEs upon access when 
> running on a cluster. Running locally through `flink-clients` works fine.
> Changing parallelism or disabling the closure cleaner don't seem to have any 
> effect.
> Minimal example, also see the attached archive.
> {code:java}
> case class IntWrapper(a1: Int)
> val wrapped = IntWrapper(42)
> env.readTextFile("myTextFile.txt").map(line => wrapped.toString).collect
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3788) Local variable values are not distributed to job runners

2016-04-19 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247896#comment-15247896
 ] 

Gabor Gevay commented on FLINK-3788:


It seems that the `extends App` stuff is causing this somehow. If I copy this 
code to a regular main method, then it works.

> Local variable values are not distributed to job runners
> 
>
> Key: FLINK-3788
> URL: https://issues.apache.org/jira/browse/FLINK-3788
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API
>Affects Versions: 1.0.0, 1.0.1
> Environment: Scala 2.11.8
> Sun JDK 1.8.0_65 or OpenJDK 1.8.0_77
> Fedora 25, 4.6.0-0.rc2.git3.1.fc25.x86_64
>Reporter: Andreas C. Osowski
> Attachments: FLINK-3788.tgz
>
>
> Variable values of non-elementary types aren't caught and distributed to job 
> runners, causing them to remain 'null' and causing NPEs upon access when 
> running on a cluster. Running locally through `flink-clients` works fine.
> Changing parallelism or disabling the closure cleaner don't seem to have any 
> effect.
> Minimal example, also see the attached archive.
> {code:java}
> case class IntWrapper(a1: Int)
> val wrapped = IntWrapper(42)
> env.readTextFile("myTextFile.txt").map(line => wrapped.toString).collect
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3789) Overload methods which trigger program execution to allow naming job

2016-04-19 Thread Greg Hogan (JIRA)
Greg Hogan created FLINK-3789:
-

 Summary: Overload methods which trigger program execution to allow 
naming job
 Key: FLINK-3789
 URL: https://issues.apache.org/jira/browse/FLINK-3789
 Project: Flink
  Issue Type: Improvement
  Components: Java API
Affects Versions: 1.1.0
Reporter: Greg Hogan
Assignee: Greg Hogan
Priority: Minor


Overload the following functions to additionally accept a job name to pass to 
{{ExecutionEnvironment.execute(String)}}.
* {{DataSet.collect()}}
* {{DataSet.count()}}
* {{DataSetUtils.checksumHashCode(DataSet)}}
* {{GraphUtils.checksumHashCode(Graph)}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3788) Local variable values are not distributed to job runners

2016-04-19 Thread Andreas C. Osowski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andreas C. Osowski updated FLINK-3788:
--
Summary: Local variable values are not distributed to job runners  (was: 
Local variable values are not distributed to task runners)

> Local variable values are not distributed to job runners
> 
>
> Key: FLINK-3788
> URL: https://issues.apache.org/jira/browse/FLINK-3788
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API
>Affects Versions: 1.0.0, 1.0.1
> Environment: Scala 2.11.8
> Sun JDK 1.8.0_65 or OpenJDK 1.8.0_77
> Fedora 25, 4.6.0-0.rc2.git3.1.fc25.x86_64
>Reporter: Andreas C. Osowski
> Attachments: FLINK-3788.tgz
>
>
> Variable values of non-elementary types aren't caught and distributed to job 
> runners, causing them to remain 'null' and causing NPEs upon access when 
> running on a cluster. Running locally through `flink-clients` works fine.
> Changing parallelism or disabling the closure cleaner don't seem to have any 
> effect.
> Minimal example, also see the attached archive.
> {code:java}
> case class IntWrapper(a1: Int)
> val wrapped = IntWrapper(42)
> env.readTextFile("myTextFile.txt").map(line => wrapped.toString).collect
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3718] Add Option For Completely Async B...

2016-04-19 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1879#issuecomment-211955064
  
Looking at the list of changed files, it should not be a big problem to 
merge your changes with mine. Thus, I would say go ahead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3718) Add Option For Completely Async Backup in RocksDB State Backend

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247865#comment-15247865
 ] 

ASF GitHub Bot commented on FLINK-3718:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1879#issuecomment-211955064
  
Looking at the list of changed files, it should not be a big problem to 
merge your changes with mine. Thus, I would say go ahead.


> Add Option For Completely Async Backup in RocksDB State Backend
> ---
>
> Key: FLINK-3718
> URL: https://issues.apache.org/jira/browse/FLINK-3718
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> Right now, the snapshotting for RocksDB has a synchronous part where a backup 
> of the RocksDB database is drawn and an asynchronous part where this backup 
> is written to HDFS.
> We should add an option that uses the snapshot feature of RocksDB to get an 
> iterator over all keys at a set point in time. The iterator can be used to 
> store everything to HDFS. Normal operation can continue while we store the 
> keys. This makes the snapshot completely asynchronous.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3701) Cant call execute after first execution

2016-04-19 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247847#comment-15247847
 ] 

Till Rohrmann commented on FLINK-3701:
--

Yes that should work.

> Cant call execute after first execution
> ---
>
> Key: FLINK-3701
> URL: https://issues.apache.org/jira/browse/FLINK-3701
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Reporter: Nikolaas Steenbergen
>Assignee: Maximilian Michels
>
> in the scala shell, local mode, version 1.0 this works:
> {code}
> Scala-Flink> var b = env.fromElements("a","b")
> Scala-Flink> b.print
> Scala-Flink> var c = env.fromElements("c","d")
> Scala-Flink> c.print
> {code}
> in the current master (after c.print) this leads to :
> {code}
> java.lang.NullPointerException
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1031)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:961)
>   at 
> org.apache.flink.api.java.ScalaShellRemoteEnvironment.execute(ScalaShellRemoteEnvironment.java:70)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>   at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
>   at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
>   at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1615)
>   at .(:56)
>   at .()
>   at .(:7)
>   at .()
>   at $print()
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
>   at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
>   at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
>   at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
>   at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
>   at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
>   at 
> scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
>   at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
>   at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
>   at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
>   at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
>   at 
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
>   at 
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>   at 
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>   at 
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>   at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
>   at 
> org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:199)
>   at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:127)
>   at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3788) Local variable values are not distributed to task runners

2016-04-19 Thread Andreas C. Osowski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andreas C. Osowski updated FLINK-3788:
--
Attachment: FLINK-3788.tgz

> Local variable values are not distributed to task runners
> -
>
> Key: FLINK-3788
> URL: https://issues.apache.org/jira/browse/FLINK-3788
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API
>Affects Versions: 1.0.0, 1.0.1
> Environment: Scala 2.11.8
> Sun JDK 1.8.0_65 or OpenJDK 1.8.0_77
> Fedora 25, 4.6.0-0.rc2.git3.1.fc25.x86_64
>Reporter: Andreas C. Osowski
> Attachments: FLINK-3788.tgz
>
>
> Variable values of non-elementary types aren't caught and distributed to job 
> runners, causing them to remain 'null' and causing NPEs upon access when 
> running on a cluster. Running locally through `flink-clients` works fine.
> Changing parallelism or disabling the closure cleaner don't seem to have any 
> effect.
> Minimal example, also see the attached archive.
> {code:java}
> case class IntWrapper(a1: Int)
> val wrapped = IntWrapper(42)
> env.readTextFile("myTextFile.txt").map(line => wrapped.toString).collect
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3788) Local variable values are not distributed to task runners

2016-04-19 Thread Andreas C. Osowski (JIRA)
Andreas C. Osowski created FLINK-3788:
-

 Summary: Local variable values are not distributed to task runners
 Key: FLINK-3788
 URL: https://issues.apache.org/jira/browse/FLINK-3788
 Project: Flink
  Issue Type: Bug
  Components: DataSet API
Affects Versions: 1.0.1, 1.0.0
 Environment: Scala 2.11.8
Sun JDK 1.8.0_65 or OpenJDK 1.8.0_77
Fedora 25, 4.6.0-0.rc2.git3.1.fc25.x86_64
Reporter: Andreas C. Osowski


Variable values of non-elementary types aren't caught and distributed to job 
runners, causing them to remain 'null' and causing NPEs upon access when 
running on a cluster. Running locally through `flink-clients` works fine.

Changing parallelism or disabling the closure cleaner don't seem to have any 
effect.

Minimal example, also see the attached archive.
{code:java}
case class IntWrapper(a1: Int)
val wrapped = IntWrapper(42)
env.readTextFile("myTextFile.txt").map(line => wrapped.toString).collect
{code}




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3774] [shell] Forwards Flink configurat...

2016-04-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1904#discussion_r60238192
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java ---
@@ -133,26 +137,31 @@ public RemoteEnvironment(String host, int port, 
Configuration clientConfig,
this.port = port;
this.clientConfiguration = clientConfig == null ? new 
Configuration() : clientConfig;
if (jarFiles != null) {
-   this.jarFiles = new URL[jarFiles.length];
+   this.jarFiles = new ArrayList(jarFiles.length);
for (int i = 0; i < jarFiles.length; i++) {
try {
-   this.jarFiles[i] = new 
File(jarFiles[i]).getAbsoluteFile().toURI().toURL();
+   this.jarFiles.add(new 
File(jarFiles[i]).getAbsoluteFile().toURI().toURL());
} catch (MalformedURLException e) {
throw new IllegalArgumentException("JAR 
file path invalid", e);
}
}
}
else {
-   this.jarFiles = null;
+   this.jarFiles = Collections.emptyList();
+   }
+
+   if (globalClasspaths == null) {
+   this.globalClasspaths = Collections.emptyList();
+   } else {
+   this.globalClasspaths = Arrays.asList(globalClasspaths);
}
-   this.globalClasspaths = globalClasspaths;
}
 
// 

 
@Override
public JobExecutionResult execute(String jobName) throws Exception {
-   ensureExecutorCreated();
+   PlanExecutor executor = getExecutor();
--- End diff --

You're right, it's bad that the `PlanExecutor` is not stopped after it has 
been used. I will fix this by checking in `ScalaShellRemoteEnvironment` whether 
`this.executor` is set. If true, then it will call `this.executor.stop()`. That 
way, there will always be at most one `PlanExecutor` active and the last one is 
stopped by the `dispose` call.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3774) Flink configuration is not correctly forwarded to PlanExecutor in ScalaShellRemoteEnvironment

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247805#comment-15247805
 ] 

ASF GitHub Bot commented on FLINK-3774:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1904#discussion_r60238192
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java ---
@@ -133,26 +137,31 @@ public RemoteEnvironment(String host, int port, 
Configuration clientConfig,
this.port = port;
this.clientConfiguration = clientConfig == null ? new 
Configuration() : clientConfig;
if (jarFiles != null) {
-   this.jarFiles = new URL[jarFiles.length];
+   this.jarFiles = new ArrayList(jarFiles.length);
for (int i = 0; i < jarFiles.length; i++) {
try {
-   this.jarFiles[i] = new 
File(jarFiles[i]).getAbsoluteFile().toURI().toURL();
+   this.jarFiles.add(new 
File(jarFiles[i]).getAbsoluteFile().toURI().toURL());
} catch (MalformedURLException e) {
throw new IllegalArgumentException("JAR 
file path invalid", e);
}
}
}
else {
-   this.jarFiles = null;
+   this.jarFiles = Collections.emptyList();
+   }
+
+   if (globalClasspaths == null) {
+   this.globalClasspaths = Collections.emptyList();
+   } else {
+   this.globalClasspaths = Arrays.asList(globalClasspaths);
}
-   this.globalClasspaths = globalClasspaths;
}
 
// 

 
@Override
public JobExecutionResult execute(String jobName) throws Exception {
-   ensureExecutorCreated();
+   PlanExecutor executor = getExecutor();
--- End diff --

You're right, it's bad that the `PlanExecutor` is not stopped after it has 
been used. I will fix this by checking in `ScalaShellRemoteEnvironment` whether 
`this.executor` is set. If true, then it will call `this.executor.stop()`. That 
way, there will always be at most one `PlanExecutor` active and the last one is 
stopped by the `dispose` call.


> Flink configuration is not correctly forwarded to PlanExecutor in 
> ScalaShellRemoteEnvironment
> -
>
> Key: FLINK-3774
> URL: https://issues.apache.org/jira/browse/FLINK-3774
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
> Fix For: 1.1.0
>
>
> Currently, the {{ScalaShellRemoteEnvironment}} does not correctly forwards 
> the Flink configuration to the {{PlanExecutor}}. Therefore, it is not 
> possible to use the Scala shell in combination with an HA cluster which needs 
> the configuration parameters set in the configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3774) Flink configuration is not correctly forwarded to PlanExecutor in ScalaShellRemoteEnvironment

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247793#comment-15247793
 ] 

ASF GitHub Bot commented on FLINK-3774:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1904#discussion_r60235985
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java ---
@@ -133,26 +137,31 @@ public RemoteEnvironment(String host, int port, 
Configuration clientConfig,
this.port = port;
this.clientConfiguration = clientConfig == null ? new 
Configuration() : clientConfig;
if (jarFiles != null) {
-   this.jarFiles = new URL[jarFiles.length];
+   this.jarFiles = new ArrayList(jarFiles.length);
--- End diff --

Will fix it.


> Flink configuration is not correctly forwarded to PlanExecutor in 
> ScalaShellRemoteEnvironment
> -
>
> Key: FLINK-3774
> URL: https://issues.apache.org/jira/browse/FLINK-3774
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
> Fix For: 1.1.0
>
>
> Currently, the {{ScalaShellRemoteEnvironment}} does not correctly forwards 
> the Flink configuration to the {{PlanExecutor}}. Therefore, it is not 
> possible to use the Scala shell in combination with an HA cluster which needs 
> the configuration parameters set in the configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3774] [shell] Forwards Flink configurat...

2016-04-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1904#discussion_r60235985
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java ---
@@ -133,26 +137,31 @@ public RemoteEnvironment(String host, int port, 
Configuration clientConfig,
this.port = port;
this.clientConfiguration = clientConfig == null ? new 
Configuration() : clientConfig;
if (jarFiles != null) {
-   this.jarFiles = new URL[jarFiles.length];
+   this.jarFiles = new ArrayList(jarFiles.length);
--- End diff --

Will fix it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...

2016-04-19 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60235061
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.LimitExceededException;
+import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
+import com.amazonaws.services.kinesis.model.StreamStatus;
+import com.amazonaws.services.kinesis.model.Shard;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A utility class that is used as a proxy to make calls to AWS Kinesis
+ * for several functions, such as getting a list of shards and fetching
+ * a batch of data records starting from a specified record sequence 
number.
+ */
+public class KinesisProxy {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KinesisProxy.class);
+
+   /** The actual Kinesis client from the AWS SDK that we will be using to 
make calls */
+   private final AmazonKinesisClient kinesisClient;
+
+   /** The AWS region that this proxy will be making calls to */
+   private final String regionId;
+
+   /** Configuration properties of this Flink Kinesis Connector */
+   private final Properties configProps;
+
+   /**
+* Create a new KinesisProxy based on the supplied configuration 
properties
+*
+* @param configProps configuration properties containing AWS 
credential and AWS region info
+*/
+   public KinesisProxy(Properties configProps) {
+   this.configProps = checkNotNull(configProps);
+
+   this.regionId = 
configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION, 
KinesisConfigConstants.DEFAULT_AWS_REGION);
+   AmazonKinesisClient client = new 
AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials());
+   
client.setRegion(Region.getRegion(Regions.fromName(this.regionId)));
--- End diff --

I had to set the endpoint here as well to make it use.
Which AWS region were you using?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247784#comment-15247784
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60235061
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.LimitExceededException;
+import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
+import com.amazonaws.services.kinesis.model.StreamStatus;
+import com.amazonaws.services.kinesis.model.Shard;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A utility class that is used as a proxy to make calls to AWS Kinesis
+ * for several functions, such as getting a list of shards and fetching
+ * a batch of data records starting from a specified record sequence 
number.
+ */
+public class KinesisProxy {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KinesisProxy.class);
+
+   /** The actual Kinesis client from the AWS SDK that we will be using to 
make calls */
+   private final AmazonKinesisClient kinesisClient;
+
+   /** The AWS region that this proxy will be making calls to */
+   private final String regionId;
+
+   /** Configuration properties of this Flink Kinesis Connector */
+   private final Properties configProps;
+
+   /**
+* Create a new KinesisProxy based on the supplied configuration 
properties
+*
+* @param configProps configuration properties containing AWS 
credential and AWS region info
+*/
+   public KinesisProxy(Properties configProps) {
+   this.configProps = checkNotNull(configProps);
+
+   this.regionId = 
configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION, 
KinesisConfigConstants.DEFAULT_AWS_REGION);
+   AmazonKinesisClient client = new 
AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials());
+   
client.setRegion(Region.getRegion(Regions.fromName(this.regionId)));
--- End diff --

I had to set the endpoint here as well to make it use.
Which AWS region were you using?


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector 

[GitHub] flink pull request: [FLINK-3783] [core] Support weighted random sa...

2016-04-19 Thread gaoyike
Github user gaoyike commented on the pull request:

https://github.com/apache/flink/pull/1909#issuecomment-211930810
  
What is the core algorithm A-ES or A-Chao?


2016-04-19 6:41 GMT-05:00 Trevor Grant :

> Nice. If this gets merged before #1898
>  I'll integrate it in.
> Otherwise I'll open a seperate PR after.
>
> —
> You are receiving this because you are subscribed to this thread.
> Reply to this email directly or view it on GitHub
> 
>



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3783) Support weighted random sampling with reservoir

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247774#comment-15247774
 ] 

ASF GitHub Bot commented on FLINK-3783:
---

Github user gaoyike commented on the pull request:

https://github.com/apache/flink/pull/1909#issuecomment-211930810
  
What is the core algorithm A-ES or A-Chao?


2016-04-19 6:41 GMT-05:00 Trevor Grant :

> Nice. If this gets merged before #1898
>  I'll integrate it in.
> Otherwise I'll open a seperate PR after.
>
> —
> You are receiving this because you are subscribed to this thread.
> Reply to this email directly or view it on GitHub
> 
>



> Support weighted random sampling with reservoir
> ---
>
> Key: FLINK-3783
> URL: https://issues.apache.org/jira/browse/FLINK-3783
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: GaoLun
>Assignee: GaoLun
>Priority: Minor
>
> In default random sampling, all items have the same probability to be 
> selected. But in weighted random sampling, the probability of each item to be 
> selected is determined by its weight with respect to the weights of the other 
> items.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247762#comment-15247762
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r60232838
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
 ---
@@ -54,6 +54,32 @@
 @Internal
 public class ReduceOperatorBase extends 
SingleInputOperator {
 
+   /**
+* An enumeration of hints, optionally usable to tell the system 
exactly how to execute the combiner phase
+* of a reduce.
+* (Note: The final reduce phase (after combining) is currently always 
executed by a sort-based strategy.)
+*/
+   public enum CombineHint {
--- End diff --

+1


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine ...

2016-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r60232838
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
 ---
@@ -54,6 +54,32 @@
 @Internal
 public class ReduceOperatorBase extends 
SingleInputOperator {
 
+   /**
+* An enumeration of hints, optionally usable to tell the system 
exactly how to execute the combiner phase
+* of a reduce.
+* (Note: The final reduce phase (after combining) is currently always 
executed by a sort-based strategy.)
+*/
+   public enum CombineHint {
--- End diff --

+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3230) Kinesis streaming producer

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247760#comment-15247760
 ] 

ASF GitHub Bot commented on FLINK-3230:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60232443
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis;
+
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.internal.StaticCredentialsProvider;
+import com.amazonaws.services.kinesis.producer.Attempt;
+import com.amazonaws.services.kinesis.producer.KinesisProducer;
+import 
com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
+import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
+import com.amazonaws.services.kinesis.producer.UserRecordResult;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The FlinkKinesisProducer allows to produce from a Flink DataStream into 
Kinesis.
+ *
+ * @param  Data type to produce into Kinesis Streams
+ */
+public class FlinkKinesisProducer extends RichSinkFunction {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisProducer.class);
+
+   /* AWS region of the stream */
+   private final String region;
+
+   /* Access and secret key of the user */
+   private final String accessKey;
+   private final String secretKey;
+
+   /* Flag controlling the error behavior of the producer */
+   private boolean failOnError = false;
+
+   /* Name of the default stream to produce to. Can be overwritten by the 
serialization schema */
+   private String defaultStream;
+
+   /* Default partition id. Can be overwritten by the serialization schema 
*/
+   private String defaultPartition;
+
+   /* Schema for turning the OUT type into a byte array. */
+   private final KinesisSerializationSchema schema;
+
+   /* Optional custom partitioner */
+   private KinesisPartitioner customPartitioner = null;
+
+
+   // --- Runtime fields 
---
+
+
+   /* Our Kinesis instance for each parallel Flink sink */
+   private transient KinesisProducer producer;
+
+   /* Callback handling failures */
+   private transient FutureCallback callback;
+
+   /* Field for async exception */
+   private transient Throwable thrownException;
+
+
+   // --- Initialization and configuration  
---
+
+
+   /**
+* Create a new FlinkKinesisProducer.
+* This is a constructor supporting Flink's {@see SerializationSchema}.
+*
+* @param region AWS region of the stream
+* @param accessKey Access key of a user with permission to access the 
stream (ideally also with access to Cloud Watch)
+* @param secretKey Secret key of the user
+* @param schema Serialization schema for the data type
+*/
+   public FlinkKinesisProducer(String region, String accessKey, String 
secretKey, final SerializationSchema schema) {
+   // create a simple wrapper for the serialization schema
+   

[GitHub] flink pull request: [FLINK-3230] Add producer for Amazon Kinesis S...

2016-04-19 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60232443
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis;
+
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.internal.StaticCredentialsProvider;
+import com.amazonaws.services.kinesis.producer.Attempt;
+import com.amazonaws.services.kinesis.producer.KinesisProducer;
+import 
com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
+import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
+import com.amazonaws.services.kinesis.producer.UserRecordResult;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The FlinkKinesisProducer allows to produce from a Flink DataStream into 
Kinesis.
+ *
+ * @param  Data type to produce into Kinesis Streams
+ */
+public class FlinkKinesisProducer extends RichSinkFunction {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisProducer.class);
+
+   /* AWS region of the stream */
+   private final String region;
+
+   /* Access and secret key of the user */
+   private final String accessKey;
+   private final String secretKey;
+
+   /* Flag controlling the error behavior of the producer */
+   private boolean failOnError = false;
+
+   /* Name of the default stream to produce to. Can be overwritten by the 
serialization schema */
+   private String defaultStream;
+
+   /* Default partition id. Can be overwritten by the serialization schema 
*/
+   private String defaultPartition;
+
+   /* Schema for turning the OUT type into a byte array. */
+   private final KinesisSerializationSchema schema;
+
+   /* Optional custom partitioner */
+   private KinesisPartitioner customPartitioner = null;
+
+
+   // --- Runtime fields 
---
+
+
+   /* Our Kinesis instance for each parallel Flink sink */
+   private transient KinesisProducer producer;
+
+   /* Callback handling failures */
+   private transient FutureCallback callback;
+
+   /* Field for async exception */
+   private transient Throwable thrownException;
+
+
+   // --- Initialization and configuration  
---
+
+
+   /**
+* Create a new FlinkKinesisProducer.
+* This is a constructor supporting Flink's {@see SerializationSchema}.
+*
+* @param region AWS region of the stream
+* @param accessKey Access key of a user with permission to access the 
stream (ideally also with access to Cloud Watch)
+* @param secretKey Secret key of the user
+* @param schema Serialization schema for the data type
+*/
+   public FlinkKinesisProducer(String region, String accessKey, String 
secretKey, final SerializationSchema schema) {
+   // create a simple wrapper for the serialization schema
+   this(region, accessKey, secretKey, new 
KinesisSerializationSchema() {
+   @Override
+   public ByteBuffer serialize(OUT element) {
+   // wrap into ByteBuffer
+  

[jira] [Commented] (FLINK-3774) Flink configuration is not correctly forwarded to PlanExecutor in ScalaShellRemoteEnvironment

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247757#comment-15247757
 ] 

ASF GitHub Bot commented on FLINK-3774:
---

Github user smarthi commented on a diff in the pull request:

https://github.com/apache/flink/pull/1904#discussion_r60231836
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java ---
@@ -133,26 +137,31 @@ public RemoteEnvironment(String host, int port, 
Configuration clientConfig,
this.port = port;
this.clientConfiguration = clientConfig == null ? new 
Configuration() : clientConfig;
if (jarFiles != null) {
-   this.jarFiles = new URL[jarFiles.length];
+   this.jarFiles = new ArrayList(jarFiles.length);
--- End diff --

Nitpick - No need of  :( 


> Flink configuration is not correctly forwarded to PlanExecutor in 
> ScalaShellRemoteEnvironment
> -
>
> Key: FLINK-3774
> URL: https://issues.apache.org/jira/browse/FLINK-3774
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
> Fix For: 1.1.0
>
>
> Currently, the {{ScalaShellRemoteEnvironment}} does not correctly forwards 
> the Flink configuration to the {{PlanExecutor}}. Therefore, it is not 
> possible to use the Scala shell in combination with an HA cluster which needs 
> the configuration parameters set in the configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3774] [shell] Forwards Flink configurat...

2016-04-19 Thread smarthi
Github user smarthi commented on a diff in the pull request:

https://github.com/apache/flink/pull/1904#discussion_r60231836
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java ---
@@ -133,26 +137,31 @@ public RemoteEnvironment(String host, int port, 
Configuration clientConfig,
this.port = port;
this.clientConfiguration = clientConfig == null ? new 
Configuration() : clientConfig;
if (jarFiles != null) {
-   this.jarFiles = new URL[jarFiles.length];
+   this.jarFiles = new ArrayList(jarFiles.length);
--- End diff --

Nitpick - No need of  :( 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247751#comment-15247751
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1838#issuecomment-211923989
  
Hi @gallenvara, thanks for the update! I tried it locally and it worked as 
expected. 

I would like two more test methods though, to ensure that the thing is 
working end-to-end.
Could you add one test method to `JoinITCase` which basically extends 
`testeUDFJoinOnTuplesWithMultipleKeyFieldPositions()` and uses range 
partitioning. For that you should provide a DataDistribution and set the 
parallelism to 4 on the environment. 
Please to the same with 
`CoGroupITCase.testCoGroupWithMultipleKeyFieldsWithFieldSelector()`

After that we can merge the PR. Thanks, Fabian


> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

2016-04-19 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1838#issuecomment-211923989
  
Hi @gallenvara, thanks for the update! I tried it locally and it worked as 
expected. 

I would like two more test methods though, to ensure that the thing is 
working end-to-end.
Could you add one test method to `JoinITCase` which basically extends 
`testeUDFJoinOnTuplesWithMultipleKeyFieldPositions()` and uses range 
partitioning. For that you should provide a DataDistribution and set the 
parallelism to 4 on the environment. 
Please to the same with 
`CoGroupITCase.testCoGroupWithMultipleKeyFieldsWithFieldSelector()`

After that we can merge the PR. Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3775] [shell] Load Flink configuration ...

2016-04-19 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/1914

[FLINK-3775] [shell] Load Flink configuration before forwarding it

This commit makes sure that the GlobalConfiguration is loaded before the 
FlinkShell
is started.

- [X] General
  - The pull request references the related JIRA issue
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message

- [X] Tests
  - Has been tested on YARN cluster in combination with #1904 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixScalaShellYarn

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1914.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1914


commit 2284a1bd172dcc91b66d558de8a5e8fff67d1651
Author: Till Rohrmann 
Date:   2016-04-19T13:04:37Z

[FLINK-3775] [shell] Load Flink configuration before forwarding it

This commit makes sure that the GlobalConfiguration is loaded before the 
FlinkShell
is started.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3775) Flink Scala shell does not respect Flink configuration

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247741#comment-15247741
 ] 

ASF GitHub Bot commented on FLINK-3775:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/1914

[FLINK-3775] [shell] Load Flink configuration before forwarding it

This commit makes sure that the GlobalConfiguration is loaded before the 
FlinkShell
is started.

- [X] General
  - The pull request references the related JIRA issue
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message

- [X] Tests
  - Has been tested on YARN cluster in combination with #1904 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixScalaShellYarn

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1914.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1914


commit 2284a1bd172dcc91b66d558de8a5e8fff67d1651
Author: Till Rohrmann 
Date:   2016-04-19T13:04:37Z

[FLINK-3775] [shell] Load Flink configuration before forwarding it

This commit makes sure that the GlobalConfiguration is loaded before the 
FlinkShell
is started.




> Flink Scala shell does not respect Flink configuration
> --
>
> Key: FLINK-3775
> URL: https://issues.apache.org/jira/browse/FLINK-3775
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> The Flink Scala shell does not load Flink's configuration properly. This 
> makes it impossible to connect to a remote HA cluster, for example. The 
> reason is that the {{GlobalConfiguration}} is never properly loaded. I think 
> the Scala shell should respect the Flink settings specified in 
> {{flink-conf.yaml}} if available.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2998) Support range partition comparison for multi input nodes.

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247745#comment-15247745
 ] 

ASF GitHub Bot commented on FLINK-2998:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r60229754
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
 ---
@@ -84,8 +84,8 @@ public PartitionOperator(DataSet input, Keys pKeys, 
Partitioner customP
Preconditions.checkArgument(distribution == null || pMethod == 
PartitionMethod.RANGE, "Customized data distribution is only neccessary for 
range partition.");

if (distribution != null) {
-   
Preconditions.checkArgument(distribution.getNumberOfFields() == 
pKeys.getNumberOfKeyFields(), "The number of key fields in the distribution and 
range partitioner should be the same.");
-   
Preconditions.checkArgument(Arrays.equals(distribution.getKeyTypes(), 
pKeys.getKeyFieldTypes()), "The types of key from the distribution and range 
partitioner are not equal.");
+   
Preconditions.checkArgument(pKeys.getNumberOfKeyFields() <= 
distribution.getNumberOfFields(), "The number of key fields in range 
partitioner should be less than the number in the distribution.");
+   
Preconditions.checkArgument(Arrays.equals(pKeys.getKeyFieldTypes(), 
Arrays.copyOfRange(distribution.getKeyTypes(), 0, 
pKeys.getNumberOfKeyFields())), "The type array of the partition key should be 
prefix of the type array of the distribution.");
--- End diff --

Oh, maybe I should improve my English skills. The message should read like: 
`"The types of the flat key fields must be equal to the types of the fields of 
the distribution."`


> Support range partition comparison for multi input nodes.
> -
>
> Key: FLINK-2998
> URL: https://issues.apache.org/jira/browse/FLINK-2998
> Project: Flink
>  Issue Type: New Feature
>  Components: Optimizer
>Reporter: Chengxiang Li
>Priority: Minor
>
> The optimizer may have potential opportunity to optimize the DAG while it 
> found two input range partition are equivalent, we does not support the 
> comparison yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

2016-04-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1838#discussion_r60229754
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
 ---
@@ -84,8 +84,8 @@ public PartitionOperator(DataSet input, Keys pKeys, 
Partitioner customP
Preconditions.checkArgument(distribution == null || pMethod == 
PartitionMethod.RANGE, "Customized data distribution is only neccessary for 
range partition.");

if (distribution != null) {
-   
Preconditions.checkArgument(distribution.getNumberOfFields() == 
pKeys.getNumberOfKeyFields(), "The number of key fields in the distribution and 
range partitioner should be the same.");
-   
Preconditions.checkArgument(Arrays.equals(distribution.getKeyTypes(), 
pKeys.getKeyFieldTypes()), "The types of key from the distribution and range 
partitioner are not equal.");
+   
Preconditions.checkArgument(pKeys.getNumberOfKeyFields() <= 
distribution.getNumberOfFields(), "The number of key fields in range 
partitioner should be less than the number in the distribution.");
+   
Preconditions.checkArgument(Arrays.equals(pKeys.getKeyFieldTypes(), 
Arrays.copyOfRange(distribution.getKeyTypes(), 0, 
pKeys.getNumberOfKeyFields())), "The type array of the partition key should be 
prefix of the type array of the distribution.");
--- End diff --

Oh, maybe I should improve my English skills. The message should read like: 
`"The types of the flat key fields must be equal to the types of the fields of 
the distribution."`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-3775) Flink Scala shell does not respect Flink configuration

2016-04-19 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-3775:


Assignee: Till Rohrmann

> Flink Scala shell does not respect Flink configuration
> --
>
> Key: FLINK-3775
> URL: https://issues.apache.org/jira/browse/FLINK-3775
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> The Flink Scala shell does not load Flink's configuration properly. This 
> makes it impossible to connect to a remote HA cluster, for example. The 
> reason is that the {{GlobalConfiguration}} is never properly loaded. I think 
> the Scala shell should respect the Flink settings specified in 
> {{flink-conf.yaml}} if available.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3701] reuse serializer lists in Executi...

2016-04-19 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1913#issuecomment-211918388
  
When simply clearing the lists, we loose the registered serializers. Added 
another commit to make the fields transient instead of clearing the lists. That 
way we can reuse the `ExecutionConfig` several times with repeated 
serialization/deserialization of the registered serializers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3701) Cant call execute after first execution

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247718#comment-15247718
 ] 

ASF GitHub Bot commented on FLINK-3701:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1913#issuecomment-211918388
  
When simply clearing the lists, we loose the registered serializers. Added 
another commit to make the fields transient instead of clearing the lists. That 
way we can reuse the `ExecutionConfig` several times with repeated 
serialization/deserialization of the registered serializers.


> Cant call execute after first execution
> ---
>
> Key: FLINK-3701
> URL: https://issues.apache.org/jira/browse/FLINK-3701
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Reporter: Nikolaas Steenbergen
>Assignee: Maximilian Michels
>
> in the scala shell, local mode, version 1.0 this works:
> {code}
> Scala-Flink> var b = env.fromElements("a","b")
> Scala-Flink> b.print
> Scala-Flink> var c = env.fromElements("c","d")
> Scala-Flink> c.print
> {code}
> in the current master (after c.print) this leads to :
> {code}
> java.lang.NullPointerException
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1031)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:961)
>   at 
> org.apache.flink.api.java.ScalaShellRemoteEnvironment.execute(ScalaShellRemoteEnvironment.java:70)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>   at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
>   at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
>   at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1615)
>   at .(:56)
>   at .()
>   at .(:7)
>   at .()
>   at $print()
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
>   at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
>   at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
>   at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
>   at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
>   at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
>   at 
> scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
>   at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
>   at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
>   at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
>   at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
>   at 
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
>   at 
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>   at 
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>   at 
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>   at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
>   at 
> org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:199)
>   at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:127)
>   at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3230) Kinesis streaming producer

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247692#comment-15247692
 ] 

ASF GitHub Bot commented on FLINK-3230:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60224497
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis;
+
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.internal.StaticCredentialsProvider;
+import com.amazonaws.services.kinesis.producer.Attempt;
+import com.amazonaws.services.kinesis.producer.KinesisProducer;
+import 
com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
+import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
+import com.amazonaws.services.kinesis.producer.UserRecordResult;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The FlinkKinesisProducer allows to produce from a Flink DataStream into 
Kinesis.
+ *
+ * @param  Data type to produce into Kinesis Streams
+ */
+public class FlinkKinesisProducer extends RichSinkFunction {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisProducer.class);
+
+   /* AWS region of the stream */
+   private final String region;
+
+   /* Access and secret key of the user */
+   private final String accessKey;
+   private final String secretKey;
+
+   /* Flag controlling the error behavior of the producer */
+   private boolean failOnError = false;
+
+   /* Name of the default stream to produce to. Can be overwritten by the 
serialization schema */
+   private String defaultStream;
+
+   /* Default partition id. Can be overwritten by the serialization schema 
*/
+   private String defaultPartition;
+
+   /* Schema for turning the OUT type into a byte array. */
+   private final KinesisSerializationSchema schema;
+
+   /* Optional custom partitioner */
+   private KinesisPartitioner customPartitioner = null;
+
+
+   // --- Runtime fields 
---
+
+
+   /* Our Kinesis instance for each parallel Flink sink */
+   private transient KinesisProducer producer;
+
+   /* Callback handling failures */
+   private transient FutureCallback callback;
+
+   /* Field for async exception */
+   private transient Throwable thrownException;
+
+
+   // --- Initialization and configuration  
---
+
+
+   /**
+* Create a new FlinkKinesisProducer.
+* This is a constructor supporting Flink's {@see SerializationSchema}.
+*
+* @param region AWS region of the stream
+* @param accessKey Access key of a user with permission to access the 
stream (ideally also with access to Cloud Watch)
+* @param secretKey Secret key of the user
+* @param schema Serialization schema for the data type
+*/
+   public FlinkKinesisProducer(String region, String accessKey, String 
secretKey, final SerializationSchema schema) {
+   // create a simple wrapper for the serialization schema
+   

[GitHub] flink pull request: [FLINK-3230] Add producer for Amazon Kinesis S...

2016-04-19 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60224497
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis;
+
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.internal.StaticCredentialsProvider;
+import com.amazonaws.services.kinesis.producer.Attempt;
+import com.amazonaws.services.kinesis.producer.KinesisProducer;
+import 
com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
+import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
+import com.amazonaws.services.kinesis.producer.UserRecordResult;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The FlinkKinesisProducer allows to produce from a Flink DataStream into 
Kinesis.
+ *
+ * @param  Data type to produce into Kinesis Streams
+ */
+public class FlinkKinesisProducer extends RichSinkFunction {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisProducer.class);
+
+   /* AWS region of the stream */
+   private final String region;
+
+   /* Access and secret key of the user */
+   private final String accessKey;
+   private final String secretKey;
+
+   /* Flag controlling the error behavior of the producer */
+   private boolean failOnError = false;
+
+   /* Name of the default stream to produce to. Can be overwritten by the 
serialization schema */
+   private String defaultStream;
+
+   /* Default partition id. Can be overwritten by the serialization schema 
*/
+   private String defaultPartition;
+
+   /* Schema for turning the OUT type into a byte array. */
+   private final KinesisSerializationSchema schema;
+
+   /* Optional custom partitioner */
+   private KinesisPartitioner customPartitioner = null;
+
+
+   // --- Runtime fields 
---
+
+
+   /* Our Kinesis instance for each parallel Flink sink */
+   private transient KinesisProducer producer;
+
+   /* Callback handling failures */
+   private transient FutureCallback callback;
+
+   /* Field for async exception */
+   private transient Throwable thrownException;
+
+
+   // --- Initialization and configuration  
---
+
+
+   /**
+* Create a new FlinkKinesisProducer.
+* This is a constructor supporting Flink's {@see SerializationSchema}.
+*
+* @param region AWS region of the stream
+* @param accessKey Access key of a user with permission to access the 
stream (ideally also with access to Cloud Watch)
+* @param secretKey Secret key of the user
+* @param schema Serialization schema for the data type
+*/
+   public FlinkKinesisProducer(String region, String accessKey, String 
secretKey, final SerializationSchema schema) {
+   // create a simple wrapper for the serialization schema
+   this(region, accessKey, secretKey, new 
KinesisSerializationSchema() {
+   @Override
+   public ByteBuffer serialize(OUT element) {
+   // wrap into ByteBuffer
+   

[jira] [Created] (FLINK-3787) Yarn client does not report unfulfillable container constraints

2016-04-19 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-3787:


 Summary: Yarn client does not report unfulfillable container 
constraints
 Key: FLINK-3787
 URL: https://issues.apache.org/jira/browse/FLINK-3787
 Project: Flink
  Issue Type: Improvement
  Components: YARN Client
Affects Versions: 1.1.0
Reporter: Till Rohrmann
Priority: Minor


If the number of virtual cores for a Yarn container is not fulfillable, then 
the {{TaskManager}} won't be started. This is only reported in the logs but not 
in the {{FlinkYarnClient}}. Thus, the user will see a started {{JobManager}} 
with no connected {{TaskManagers}}. Since the log aggregation is only available 
after the Yarn job has been stopped, there is no easy way for the user to 
detect what's going on.

This problem is aggravated by the fact that the number of virtual cores is 
coupled to the number of slots if no explicit value has been set for the 
virtual cores. Therefore, it might happen that the Yarn deployment fails 
because of the virtual cores even though the user has never set a value for 
them (the user might even not know about the virtual cores).

I think it would be good to check if the virtual cores constraint is 
fulfillable. If not, then the user should receive a clear message that the 
Flink cluster cannot be deployed (similar to the memory constraints).  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3230) Kinesis streaming producer

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247679#comment-15247679
 ] 

ASF GitHub Bot commented on FLINK-3230:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1910#issuecomment-211906744
  
Thank you @zentol and @uce for the review.
I hope I addressed all your concerns.


> Kinesis streaming producer
> --
>
> Key: FLINK-3230
> URL: https://issues.apache.org/jira/browse/FLINK-3230
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Robert Metzger
>
> Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will 
> be using AWS SDK implementation for code consistency with the 
> FlinkKinesisConsumer.
> The features of FlinkKinesisProducer is rather straightforward:
> 1. Partition put records based on partition key.
> 2. Configurable put mode: Bulk put for higher throughput vs. sequential 
> single record puts. Size of bulk should also be configurable.
> 3. For bulk put, user can also choose to enforce strict ordering of the 
> result with the tradeoff of higher put latency. Ref: 
> https://brandur.org/kinesis-order



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3230] Add producer for Amazon Kinesis S...

2016-04-19 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1910#issuecomment-211906744
  
Thank you @zentol and @uce for the review.
I hope I addressed all your concerns.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3786) Add BigDecimal and BigInteger as Basic types

2016-04-19 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247657#comment-15247657
 ] 

Chesnay Schepler commented on FLINK-3786:
-

The JDBC formats would benefit from this as well, +1.

> Add BigDecimal and BigInteger as Basic types
> 
>
> Key: FLINK-3786
> URL: https://issues.apache.org/jira/browse/FLINK-3786
> Project: Flink
>  Issue Type: New Feature
>  Components: Core
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> We already had the discussion on the mailing list some months ago about 
> adding BigDecimal and BigInteger as basic types.
> Especially for business or scientific applications it 
> makes sense to support the BigInteger and BigDecimal types natively. In 
> my opinion they are as important as Date or Void and should be added as 
> BasicTypes. The Table API would also benefit from it.
> http://mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/%3c564cad71.8070...@apache.org%3E



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3230) Kinesis streaming producer

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247647#comment-15247647
 ] 

ASF GitHub Bot commented on FLINK-3230:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60220590
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis;
+
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.internal.StaticCredentialsProvider;
+import com.amazonaws.services.kinesis.producer.Attempt;
+import com.amazonaws.services.kinesis.producer.KinesisProducer;
+import 
com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
+import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
+import com.amazonaws.services.kinesis.producer.UserRecordResult;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The FlinkKinesisProducer allows to produce from a Flink DataStream into 
Kinesis.
+ *
+ * @param  Data type to produce into Kinesis Streams
+ */
+public class FlinkKinesisProducer extends RichSinkFunction {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisProducer.class);
+
+   /* AWS region of the stream */
+   private final String region;
+
+   /* Access and secret key of the user */
+   private final String accessKey;
+   private final String secretKey;
+
+   /* Flag controlling the error behavior of the producer */
+   private boolean failOnError = false;
+
+   /* Name of the default stream to produce to. Can be overwritten by the 
serialization schema */
+   private String defaultStream;
+
+   /* Default partition id. Can be overwritten by the serialization schema 
*/
+   private String defaultPartition;
+
+   /* Schema for turning the OUT type into a byte array. */
+   private final KinesisSerializationSchema schema;
+
+   /* Optional custom partitioner */
+   private KinesisPartitioner customPartitioner = null;
+
+
+   // --- Runtime fields 
---
+
+
+   /* Our Kinesis instance for each parallel Flink sink */
+   private transient KinesisProducer producer;
+
+   /* Callback handling failures */
+   private transient FutureCallback callback;
+
+   /* Field for async exception */
+   private transient Throwable thrownException;
+
+
+   // --- Initialization and configuration  
---
+
+
+   /**
+* Create a new FlinkKinesisProducer.
+* This is a constructor supporting Flink's {@see SerializationSchema}.
+*
+* @param region AWS region of the stream
+* @param accessKey Access key of a user with permission to access the 
stream (ideally also with access to Cloud Watch)
+* @param secretKey Secret key of the user
+* @param schema Serialization schema for the data type
+*/
+   public FlinkKinesisProducer(String region, String accessKey, String 
secretKey, final SerializationSchema schema) {
+   // create a simple wrapper for the serialization schema
+   

[jira] [Commented] (FLINK-3230) Kinesis streaming producer

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247649#comment-15247649
 ] 

ASF GitHub Bot commented on FLINK-3230:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60220641
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis.examples;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner;
+import 
org.apache.flink.streaming.connectors.kinesis.KinesisSerializationSchema;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This is an example on how to produce data into Kinesis
+ */
+public class ProduceIntoKinesis {
+
+   public static void main(String[] args) throws Exception {
+   ParameterTool pt = ParameterTool.fromArgs(args);
+
+   StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   see.setParallelism(1);
+
+   DataStream simpleStringStream = see.addSource(new 
EventsGenerator());
+
+   FlinkKinesisProducer kinesis = new 
FlinkKinesisProducer<>(pt.getRequired("region"),
+   pt.getRequired("accessKey"),
+   pt.getRequired("secretKey"), new 
SimpleStringSchema());
+
+   kinesis.setFailOnError(true);
+   kinesis.setDefaultStream("test-flink");
+   kinesis.setDefaultPartition("0");
+
+   simpleStringStream.addSink(kinesis);
+
+   see.execute();
+   }
+
+   public static class EventsGenerator implements SourceFunction {
+   private boolean running = true;
+
+   @Override
+   public void run(SourceContext ctx) throws Exception {
+   long seq = 0;
+   while(running) {
+
--- End diff --

Its invisible code ;)


> Kinesis streaming producer
> --
>
> Key: FLINK-3230
> URL: https://issues.apache.org/jira/browse/FLINK-3230
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Robert Metzger
>
> Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will 
> be using AWS SDK implementation for code consistency with the 
> FlinkKinesisConsumer.
> The features of FlinkKinesisProducer is rather straightforward:
> 1. Partition put records based on partition key.
> 2. Configurable put mode: Bulk put for higher throughput vs. sequential 
> single record puts. Size of bulk should also be configurable.
> 3. For bulk put, user can also choose to enforce strict ordering of the 
> result with the tradeoff of higher put latency. Ref: 
> https://brandur.org/kinesis-order



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3230] Add producer for Amazon Kinesis S...

2016-04-19 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60220641
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis.examples;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner;
+import 
org.apache.flink.streaming.connectors.kinesis.KinesisSerializationSchema;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This is an example on how to produce data into Kinesis
+ */
+public class ProduceIntoKinesis {
+
+   public static void main(String[] args) throws Exception {
+   ParameterTool pt = ParameterTool.fromArgs(args);
+
+   StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   see.setParallelism(1);
+
+   DataStream simpleStringStream = see.addSource(new 
EventsGenerator());
+
+   FlinkKinesisProducer kinesis = new 
FlinkKinesisProducer<>(pt.getRequired("region"),
+   pt.getRequired("accessKey"),
+   pt.getRequired("secretKey"), new 
SimpleStringSchema());
+
+   kinesis.setFailOnError(true);
+   kinesis.setDefaultStream("test-flink");
+   kinesis.setDefaultPartition("0");
+
+   simpleStringStream.addSink(kinesis);
+
+   see.execute();
+   }
+
+   public static class EventsGenerator implements SourceFunction {
+   private boolean running = true;
+
+   @Override
+   public void run(SourceContext ctx) throws Exception {
+   long seq = 0;
+   while(running) {
+
--- End diff --

Its invisible code ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3701) Cant call execute after first execution

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247645#comment-15247645
 ] 

ASF GitHub Bot commented on FLINK-3701:
---

GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/1913

[FLINK-3701] reuse serializer lists in ExecutionConfig

https://issues.apache.org/jira/browse/FLINK-3701

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink FLINK-3701

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1913.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1913


commit 08921d47e72ce219246d68a63e77d0745c20420d
Author: Maximilian Michels 
Date:   2016-04-19T12:16:08Z

[FLINK-3701] reuse serializer lists in ExecutionConfig




> Cant call execute after first execution
> ---
>
> Key: FLINK-3701
> URL: https://issues.apache.org/jira/browse/FLINK-3701
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Reporter: Nikolaas Steenbergen
>Assignee: Maximilian Michels
>
> in the scala shell, local mode, version 1.0 this works:
> {code}
> Scala-Flink> var b = env.fromElements("a","b")
> Scala-Flink> b.print
> Scala-Flink> var c = env.fromElements("c","d")
> Scala-Flink> c.print
> {code}
> in the current master (after c.print) this leads to :
> {code}
> java.lang.NullPointerException
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1031)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:961)
>   at 
> org.apache.flink.api.java.ScalaShellRemoteEnvironment.execute(ScalaShellRemoteEnvironment.java:70)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>   at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
>   at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
>   at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1615)
>   at .(:56)
>   at .()
>   at .(:7)
>   at .()
>   at $print()
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
>   at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
>   at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
>   at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
>   at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
>   at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
>   at 
> scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
>   at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
>   at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
>   at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
>   at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
>   at 
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
>   at 
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>   at 
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>   at 
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>   at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
>   at 
> org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:199)
>   at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:127)
>   at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3230] Add producer for Amazon Kinesis S...

2016-04-19 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60220590
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis;
+
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.internal.StaticCredentialsProvider;
+import com.amazonaws.services.kinesis.producer.Attempt;
+import com.amazonaws.services.kinesis.producer.KinesisProducer;
+import 
com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
+import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
+import com.amazonaws.services.kinesis.producer.UserRecordResult;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The FlinkKinesisProducer allows to produce from a Flink DataStream into 
Kinesis.
+ *
+ * @param  Data type to produce into Kinesis Streams
+ */
+public class FlinkKinesisProducer extends RichSinkFunction {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisProducer.class);
+
+   /* AWS region of the stream */
+   private final String region;
+
+   /* Access and secret key of the user */
+   private final String accessKey;
+   private final String secretKey;
+
+   /* Flag controlling the error behavior of the producer */
+   private boolean failOnError = false;
+
+   /* Name of the default stream to produce to. Can be overwritten by the 
serialization schema */
+   private String defaultStream;
+
+   /* Default partition id. Can be overwritten by the serialization schema 
*/
+   private String defaultPartition;
+
+   /* Schema for turning the OUT type into a byte array. */
+   private final KinesisSerializationSchema schema;
+
+   /* Optional custom partitioner */
+   private KinesisPartitioner customPartitioner = null;
+
+
+   // --- Runtime fields 
---
+
+
+   /* Our Kinesis instance for each parallel Flink sink */
+   private transient KinesisProducer producer;
+
+   /* Callback handling failures */
+   private transient FutureCallback callback;
+
+   /* Field for async exception */
+   private transient Throwable thrownException;
+
+
+   // --- Initialization and configuration  
---
+
+
+   /**
+* Create a new FlinkKinesisProducer.
+* This is a constructor supporting Flink's {@see SerializationSchema}.
+*
+* @param region AWS region of the stream
+* @param accessKey Access key of a user with permission to access the 
stream (ideally also with access to Cloud Watch)
+* @param secretKey Secret key of the user
+* @param schema Serialization schema for the data type
+*/
+   public FlinkKinesisProducer(String region, String accessKey, String 
secretKey, final SerializationSchema schema) {
+   // create a simple wrapper for the serialization schema
+   this(region, accessKey, secretKey, new 
KinesisSerializationSchema() {
+   @Override
+   public ByteBuffer serialize(OUT element) {
+   // wrap into ByteBuffer
+  

[GitHub] flink pull request: [FLINK-3701] reuse serializer lists in Executi...

2016-04-19 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/1913

[FLINK-3701] reuse serializer lists in ExecutionConfig

https://issues.apache.org/jira/browse/FLINK-3701

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink FLINK-3701

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1913.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1913


commit 08921d47e72ce219246d68a63e77d0745c20420d
Author: Maximilian Michels 
Date:   2016-04-19T12:16:08Z

[FLINK-3701] reuse serializer lists in ExecutionConfig




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3701) Cant call execute after first execution

2016-04-19 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247644#comment-15247644
 ] 

Maximilian Michels commented on FLINK-3701:
---

The issue is that the serializer lists in the {{ExecutionConfig}} are set to 
null after they have been serialized for cluster shipping. A solution would be 
to simply clear those lists and not set them to null. This would reset them to 
their original state for further executions. Setting them to null can also 
cause other problems, i.e. when additional serializers are registered after the 
first execution. Perhaps you could also comment [~till.rohrmann] if that works.



> Cant call execute after first execution
> ---
>
> Key: FLINK-3701
> URL: https://issues.apache.org/jira/browse/FLINK-3701
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Reporter: Nikolaas Steenbergen
>Assignee: Maximilian Michels
>
> in the scala shell, local mode, version 1.0 this works:
> {code}
> Scala-Flink> var b = env.fromElements("a","b")
> Scala-Flink> b.print
> Scala-Flink> var c = env.fromElements("c","d")
> Scala-Flink> c.print
> {code}
> in the current master (after c.print) this leads to :
> {code}
> java.lang.NullPointerException
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1031)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:961)
>   at 
> org.apache.flink.api.java.ScalaShellRemoteEnvironment.execute(ScalaShellRemoteEnvironment.java:70)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>   at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
>   at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
>   at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1615)
>   at .(:56)
>   at .()
>   at .(:7)
>   at .()
>   at $print()
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
>   at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
>   at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
>   at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
>   at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
>   at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
>   at 
> scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
>   at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
>   at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
>   at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
>   at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
>   at 
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
>   at 
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>   at 
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>   at 
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>   at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
>   at 
> org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:199)
>   at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:127)
>   at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-3701) Cant call execute after first execution

2016-04-19 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels reassigned FLINK-3701:
-

Assignee: Maximilian Michels

> Cant call execute after first execution
> ---
>
> Key: FLINK-3701
> URL: https://issues.apache.org/jira/browse/FLINK-3701
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Reporter: Nikolaas Steenbergen
>Assignee: Maximilian Michels
>
> in the scala shell, local mode, version 1.0 this works:
> {code}
> Scala-Flink> var b = env.fromElements("a","b")
> Scala-Flink> b.print
> Scala-Flink> var c = env.fromElements("c","d")
> Scala-Flink> c.print
> {code}
> in the current master (after c.print) this leads to :
> {code}
> java.lang.NullPointerException
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1031)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:961)
>   at 
> org.apache.flink.api.java.ScalaShellRemoteEnvironment.execute(ScalaShellRemoteEnvironment.java:70)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>   at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
>   at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
>   at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1615)
>   at .(:56)
>   at .()
>   at .(:7)
>   at .()
>   at $print()
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
>   at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
>   at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
>   at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
>   at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
>   at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
>   at 
> scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
>   at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
>   at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
>   at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
>   at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
>   at 
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
>   at 
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>   at 
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>   at 
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>   at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
>   at 
> org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:199)
>   at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:127)
>   at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3230) Kinesis streaming producer

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247639#comment-15247639
 ] 

ASF GitHub Bot commented on FLINK-3230:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60219813
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis;
+
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.internal.StaticCredentialsProvider;
+import com.amazonaws.services.kinesis.producer.Attempt;
+import com.amazonaws.services.kinesis.producer.KinesisProducer;
+import 
com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
+import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
+import com.amazonaws.services.kinesis.producer.UserRecordResult;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The FlinkKinesisProducer allows to produce from a Flink DataStream into 
Kinesis.
+ *
+ * @param  Data type to produce into Kinesis Streams
+ */
+public class FlinkKinesisProducer extends RichSinkFunction {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisProducer.class);
+
+   /* AWS region of the stream */
+   private final String region;
+
+   /* Access and secret key of the user */
+   private final String accessKey;
+   private final String secretKey;
+
+   /* Flag controlling the error behavior of the producer */
+   private boolean failOnError = false;
+
+   /* Name of the default stream to produce to. Can be overwritten by the 
serialization schema */
+   private String defaultStream;
+
+   /* Default partition id. Can be overwritten by the serialization schema 
*/
+   private String defaultPartition;
+
+   /* Schema for turning the OUT type into a byte array. */
+   private final KinesisSerializationSchema schema;
+
+   /* Optional custom partitioner */
+   private KinesisPartitioner customPartitioner = null;
+
+
+   // --- Runtime fields 
---
+
+
+   /* Our Kinesis instance for each parallel Flink sink */
+   private transient KinesisProducer producer;
+
+   /* Callback handling failures */
+   private transient FutureCallback callback;
+
+   /* Field for async exception */
+   private transient Throwable thrownException;
+
+
+   // --- Initialization and configuration  
---
+
+
+   /**
+* Create a new FlinkKinesisProducer.
+* This is a constructor supporting Flink's {@see SerializationSchema}.
+*
+* @param region AWS region of the stream
+* @param accessKey Access key of a user with permission to access the 
stream (ideally also with access to Cloud Watch)
+* @param secretKey Secret key of the user
+* @param schema Serialization schema for the data type
+*/
+   public FlinkKinesisProducer(String region, String accessKey, String 
secretKey, final SerializationSchema schema) {
+   // create a simple wrapper for the serialization schema
+   

[GitHub] flink pull request: [FLINK-3230] Add producer for Amazon Kinesis S...

2016-04-19 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60219813
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis;
+
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.internal.StaticCredentialsProvider;
+import com.amazonaws.services.kinesis.producer.Attempt;
+import com.amazonaws.services.kinesis.producer.KinesisProducer;
+import 
com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
+import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
+import com.amazonaws.services.kinesis.producer.UserRecordResult;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The FlinkKinesisProducer allows to produce from a Flink DataStream into 
Kinesis.
+ *
+ * @param  Data type to produce into Kinesis Streams
+ */
+public class FlinkKinesisProducer extends RichSinkFunction {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisProducer.class);
+
+   /* AWS region of the stream */
+   private final String region;
+
+   /* Access and secret key of the user */
+   private final String accessKey;
+   private final String secretKey;
+
+   /* Flag controlling the error behavior of the producer */
+   private boolean failOnError = false;
+
+   /* Name of the default stream to produce to. Can be overwritten by the 
serialization schema */
+   private String defaultStream;
+
+   /* Default partition id. Can be overwritten by the serialization schema 
*/
+   private String defaultPartition;
+
+   /* Schema for turning the OUT type into a byte array. */
+   private final KinesisSerializationSchema schema;
+
+   /* Optional custom partitioner */
+   private KinesisPartitioner customPartitioner = null;
+
+
+   // --- Runtime fields 
---
+
+
+   /* Our Kinesis instance for each parallel Flink sink */
+   private transient KinesisProducer producer;
+
+   /* Callback handling failures */
+   private transient FutureCallback callback;
+
+   /* Field for async exception */
+   private transient Throwable thrownException;
+
+
+   // --- Initialization and configuration  
---
+
+
+   /**
+* Create a new FlinkKinesisProducer.
+* This is a constructor supporting Flink's {@see SerializationSchema}.
+*
+* @param region AWS region of the stream
+* @param accessKey Access key of a user with permission to access the 
stream (ideally also with access to Cloud Watch)
+* @param secretKey Secret key of the user
+* @param schema Serialization schema for the data type
+*/
+   public FlinkKinesisProducer(String region, String accessKey, String 
secretKey, final SerializationSchema schema) {
+   // create a simple wrapper for the serialization schema
+   this(region, accessKey, secretKey, new 
KinesisSerializationSchema() {
+   @Override
+   public ByteBuffer serialize(OUT element) {
+   // wrap into ByteBuffer
+  

[jira] [Commented] (FLINK-3230) Kinesis streaming producer

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247636#comment-15247636
 ] 

ASF GitHub Bot commented on FLINK-3230:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60219525
  
--- Diff: flink-streaming-connectors/flink-connector-twitter/pom.xml ---
@@ -35,6 +35,11 @@ under the License.
 
jar
 
+   
+   2.2.0
--- End diff --

Its not related at all. I just came across it and fixed it.



> Kinesis streaming producer
> --
>
> Key: FLINK-3230
> URL: https://issues.apache.org/jira/browse/FLINK-3230
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Robert Metzger
>
> Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will 
> be using AWS SDK implementation for code consistency with the 
> FlinkKinesisConsumer.
> The features of FlinkKinesisProducer is rather straightforward:
> 1. Partition put records based on partition key.
> 2. Configurable put mode: Bulk put for higher throughput vs. sequential 
> single record puts. Size of bulk should also be configurable.
> 3. For bulk put, user can also choose to enforce strict ordering of the 
> result with the tradeoff of higher put latency. Ref: 
> https://brandur.org/kinesis-order



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3230] Add producer for Amazon Kinesis S...

2016-04-19 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60219525
  
--- Diff: flink-streaming-connectors/flink-connector-twitter/pom.xml ---
@@ -35,6 +35,11 @@ under the License.
 
jar
 
+   
+   2.2.0
--- End diff --

Its not related at all. I just came across it and fixed it.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3749) Improve decimal handling

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247635#comment-15247635
 ] 

ASF GitHub Bot commented on FLINK-3749:
---

Github user twalthr commented on the pull request:

https://github.com/apache/flink/pull/1881#issuecomment-211892621
  
@vasia and @fhueske: I have opened FLINK-3786 for a long-term solution ;-)
I will start working on this issue.


> Improve decimal handling
> 
>
> Key: FLINK-3749
> URL: https://issues.apache.org/jira/browse/FLINK-3749
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> The current decimal handling is too restrictive and does not allow literals 
> such as "11.2".



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3749] [table] Improve decimal handling

2016-04-19 Thread twalthr
Github user twalthr commented on the pull request:

https://github.com/apache/flink/pull/1881#issuecomment-211892621
  
@vasia and @fhueske: I have opened FLINK-3786 for a long-term solution ;-)
I will start working on this issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-3786) Add BigDecimal and BigInteger as Basic types

2016-04-19 Thread Timo Walther (JIRA)
Timo Walther created FLINK-3786:
---

 Summary: Add BigDecimal and BigInteger as Basic types
 Key: FLINK-3786
 URL: https://issues.apache.org/jira/browse/FLINK-3786
 Project: Flink
  Issue Type: New Feature
  Components: Core
Reporter: Timo Walther
Assignee: Timo Walther


We already had the discussion on the mailing list some months ago about adding 
BigDecimal and BigInteger as basic types.

Especially for business or scientific applications it 
makes sense to support the BigInteger and BigDecimal types natively. In 
my opinion they are as important as Date or Void and should be added as 
BasicTypes. The Table API would also benefit from it.

http://mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/%3c564cad71.8070...@apache.org%3E




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3774) Flink configuration is not correctly forwarded to PlanExecutor in ScalaShellRemoteEnvironment

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247621#comment-15247621
 ] 

ASF GitHub Bot commented on FLINK-3774:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1904#discussion_r60217702
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java ---
@@ -133,26 +137,31 @@ public RemoteEnvironment(String host, int port, 
Configuration clientConfig,
this.port = port;
this.clientConfiguration = clientConfig == null ? new 
Configuration() : clientConfig;
if (jarFiles != null) {
-   this.jarFiles = new URL[jarFiles.length];
+   this.jarFiles = new ArrayList(jarFiles.length);
for (int i = 0; i < jarFiles.length; i++) {
try {
-   this.jarFiles[i] = new 
File(jarFiles[i]).getAbsoluteFile().toURI().toURL();
+   this.jarFiles.add(new 
File(jarFiles[i]).getAbsoluteFile().toURI().toURL());
} catch (MalformedURLException e) {
throw new IllegalArgumentException("JAR 
file path invalid", e);
}
}
}
else {
-   this.jarFiles = null;
+   this.jarFiles = Collections.emptyList();
+   }
+
+   if (globalClasspaths == null) {
+   this.globalClasspaths = Collections.emptyList();
+   } else {
+   this.globalClasspaths = Arrays.asList(globalClasspaths);
}
-   this.globalClasspaths = globalClasspaths;
}
 
// 

 
@Override
public JobExecutionResult execute(String jobName) throws Exception {
-   ensureExecutorCreated();
+   PlanExecutor executor = getExecutor();
--- End diff --

would it make sense to move the assignment to ```this.executor``` from 
```getExecutor()``` to this line? as it stands the ScalaShellRemoteEnvironment 
executor is never closed.


> Flink configuration is not correctly forwarded to PlanExecutor in 
> ScalaShellRemoteEnvironment
> -
>
> Key: FLINK-3774
> URL: https://issues.apache.org/jira/browse/FLINK-3774
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
> Fix For: 1.1.0
>
>
> Currently, the {{ScalaShellRemoteEnvironment}} does not correctly forwards 
> the Flink configuration to the {{PlanExecutor}}. Therefore, it is not 
> possible to use the Scala shell in combination with an HA cluster which needs 
> the configuration parameters set in the configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3774] [shell] Forwards Flink configurat...

2016-04-19 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1904#discussion_r60217702
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java ---
@@ -133,26 +137,31 @@ public RemoteEnvironment(String host, int port, 
Configuration clientConfig,
this.port = port;
this.clientConfiguration = clientConfig == null ? new 
Configuration() : clientConfig;
if (jarFiles != null) {
-   this.jarFiles = new URL[jarFiles.length];
+   this.jarFiles = new ArrayList(jarFiles.length);
for (int i = 0; i < jarFiles.length; i++) {
try {
-   this.jarFiles[i] = new 
File(jarFiles[i]).getAbsoluteFile().toURI().toURL();
+   this.jarFiles.add(new 
File(jarFiles[i]).getAbsoluteFile().toURI().toURL());
} catch (MalformedURLException e) {
throw new IllegalArgumentException("JAR 
file path invalid", e);
}
}
}
else {
-   this.jarFiles = null;
+   this.jarFiles = Collections.emptyList();
+   }
+
+   if (globalClasspaths == null) {
+   this.globalClasspaths = Collections.emptyList();
+   } else {
+   this.globalClasspaths = Arrays.asList(globalClasspaths);
}
-   this.globalClasspaths = globalClasspaths;
}
 
// 

 
@Override
public JobExecutionResult execute(String jobName) throws Exception {
-   ensureExecutorCreated();
+   PlanExecutor executor = getExecutor();
--- End diff --

would it make sense to move the assignment to ```this.executor``` from 
```getExecutor()``` to this line? as it stands the ScalaShellRemoteEnvironment 
executor is never closed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-1427) Configuration through environment variables

2016-04-19 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1427?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels closed FLINK-1427.
-
Resolution: Won't Fix

Thanks, closing for now.

> Configuration through environment variables
> ---
>
> Key: FLINK-1427
> URL: https://issues.apache.org/jira/browse/FLINK-1427
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
> Environment: Deployment
>Reporter: Maximilian Michels
>Priority: Minor
>  Labels: configuration, deployment
>
> Like Hadoop or Spark, etc. Flink should support configuration via shell 
> environment variables. In cluster setups, this makes things a lot easier 
> because writing config files can be omitted. Many automation tools (e.g. 
> Google's bdutil) use (or abuse) this feature.
> For example, to set up the task manager heap size, we would run `export 
> FLINK_TASKMANAGER_HEAP=4096` before starting the task manager on a node to 
> set the heap memory size to 4096MB.
> Environment variables should overwrite the regular config entries.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-3764) LeaderChangeStateCleanupTest.testStateCleanupAfterListenerNotification fails on Travis

2016-04-19 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-3764.
---
Resolution: Fixed

Fixed in 716d832098b47ec19c6f5c675eb461d3ba96f60a

> LeaderChangeStateCleanupTest.testStateCleanupAfterListenerNotification fails 
> on Travis
> --
>
> Key: FLINK-3764
> URL: https://issues.apache.org/jira/browse/FLINK-3764
> Project: Flink
>  Issue Type: Bug
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
>
> The 
> {{LeaderChangeStateCleanupTest.testStateCleanupAfterListenerNotification}} 
> fails spuriously on Travis because of a {{NullPointerException}}. The reason 
> is that it's not properly waited until the {{ResourceManager}} has been 
> started. Due to this, it can happen that a leader notification message is 
> tried to be sent to a {{LeaderRetrievalListener}} which has not been set by 
> the {{ResourceManager}}.
> [1] https://s3.amazonaws.com/archive.travis-ci.org/jobs/123271732/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3764] [tests] Improve LeaderChangeState...

2016-04-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1894


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3764) LeaderChangeStateCleanupTest.testStateCleanupAfterListenerNotification fails on Travis

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247613#comment-15247613
 ] 

ASF GitHub Bot commented on FLINK-3764:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1894


> LeaderChangeStateCleanupTest.testStateCleanupAfterListenerNotification fails 
> on Travis
> --
>
> Key: FLINK-3764
> URL: https://issues.apache.org/jira/browse/FLINK-3764
> Project: Flink
>  Issue Type: Bug
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
>
> The 
> {{LeaderChangeStateCleanupTest.testStateCleanupAfterListenerNotification}} 
> fails spuriously on Travis because of a {{NullPointerException}}. The reason 
> is that it's not properly waited until the {{ResourceManager}} has been 
> started. Due to this, it can happen that a leader notification message is 
> tried to be sent to a {{LeaderRetrievalListener}} which has not been set by 
> the {{ResourceManager}}.
> [1] https://s3.amazonaws.com/archive.travis-ci.org/jobs/123271732/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3086) ExpressionParser does not support concatenation of suffix operations

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247608#comment-15247608
 ] 

ASF GitHub Bot commented on FLINK-3086:
---

GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/1912

[FLINK-3086] [table] ExpressionParser does not support concatenation of 
suffix operations

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

This PR reworks the Table API `ExpressionParser`. It eliminates 
inconsistencies, adds more complex tests and ensures that everything from Scala 
Table API is also support in Java Table API.
Even expressions like `a.abs() + a.abs()` or `true) === true) || 
false).cast(STRING) + 'X ').trim` should not be an issue anymore.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink ExpressionParserRework

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1912.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1912


commit 9e331bfc45989bef2de8601410a037738fad2d7e
Author: twalthr 
Date:   2016-04-18T14:28:19Z

[FLINK-3086] [table] ExpressionParser does not support concatenation of 
suffix operations




> ExpressionParser does not support concatenation of suffix operations
> 
>
> Key: FLINK-3086
> URL: https://issues.apache.org/jira/browse/FLINK-3086
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> The ExpressionParser of the Table API does not support concatenation of 
> suffix operations. e.g. 
> {code}table.select("field.cast(STRING).substring(2)"){code} throws  an 
> exception.
> {code}
> org.apache.flink.api.table.ExpressionException: Could not parse expression: 
> string matching regex `\z' expected but `.' found
>   at 
> org.apache.flink.api.table.parser.ExpressionParser$.parseExpressionList(ExpressionParser.scala:224)
> {code}
> However, the Scala implicit Table Expression API supports this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3086] [table] ExpressionParser does not...

2016-04-19 Thread twalthr
GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/1912

[FLINK-3086] [table] ExpressionParser does not support concatenation of 
suffix operations

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

This PR reworks the Table API `ExpressionParser`. It eliminates 
inconsistencies, adds more complex tests and ensures that everything from Scala 
Table API is also support in Java Table API.
Even expressions like `a.abs() + a.abs()` or `true) === true) || 
false).cast(STRING) + 'X ').trim` should not be an issue anymore.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink ExpressionParserRework

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1912.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1912


commit 9e331bfc45989bef2de8601410a037738fad2d7e
Author: twalthr 
Date:   2016-04-18T14:28:19Z

[FLINK-3086] [table] ExpressionParser does not support concatenation of 
suffix operations




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3783) Support weighted random sampling with reservoir

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247597#comment-15247597
 ] 

ASF GitHub Bot commented on FLINK-3783:
---

Github user rawkintrevo commented on the pull request:

https://github.com/apache/flink/pull/1909#issuecomment-211873070
  
Nice.  If this gets merged before https://github.com/apache/flink/pull/1898 
I'll integrate it in. Otherwise I'll open a seperate PR after. 


> Support weighted random sampling with reservoir
> ---
>
> Key: FLINK-3783
> URL: https://issues.apache.org/jira/browse/FLINK-3783
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: GaoLun
>Assignee: GaoLun
>Priority: Minor
>
> In default random sampling, all items have the same probability to be 
> selected. But in weighted random sampling, the probability of each item to be 
> selected is determined by its weight with respect to the weights of the other 
> items.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3783] [core] Support weighted random sa...

2016-04-19 Thread rawkintrevo
Github user rawkintrevo commented on the pull request:

https://github.com/apache/flink/pull/1909#issuecomment-211873070
  
Nice.  If this gets merged before https://github.com/apache/flink/pull/1898 
I'll integrate it in. Otherwise I'll open a seperate PR after. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3764) LeaderChangeStateCleanupTest.testStateCleanupAfterListenerNotification fails on Travis

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247596#comment-15247596
 ] 

ASF GitHub Bot commented on FLINK-3764:
---

Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1894#issuecomment-211872883
  
Merging.


> LeaderChangeStateCleanupTest.testStateCleanupAfterListenerNotification fails 
> on Travis
> --
>
> Key: FLINK-3764
> URL: https://issues.apache.org/jira/browse/FLINK-3764
> Project: Flink
>  Issue Type: Bug
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
>
> The 
> {{LeaderChangeStateCleanupTest.testStateCleanupAfterListenerNotification}} 
> fails spuriously on Travis because of a {{NullPointerException}}. The reason 
> is that it's not properly waited until the {{ResourceManager}} has been 
> started. Due to this, it can happen that a leader notification message is 
> tried to be sent to a {{LeaderRetrievalListener}} which has not been set by 
> the {{ResourceManager}}.
> [1] https://s3.amazonaws.com/archive.travis-ci.org/jobs/123271732/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3764] [tests] Improve LeaderChangeState...

2016-04-19 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1894#issuecomment-211872883
  
Merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-2848) Refactor Flink benchmarks with JMH and move to flink-benchmark module

2016-04-19 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-2848.
---
Resolution: Invalid

The JMH license is not ASL compliant. Further discussions regarding a 
flink-benchmark module should be taken to FLINK-2973.

> Refactor Flink benchmarks with JMH and move to flink-benchmark module
> -
>
> Key: FLINK-2848
> URL: https://issues.apache.org/jira/browse/FLINK-2848
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Reporter: Chengxiang Li
>Assignee: Chengxiang Li
>Priority: Minor
>
> There are many flink internal micro benchmarks in different modules, which 
> are coarse measured(by System.currentNanoTime()...), and with no warmup or 
> multi iteration test. This is an umbrella JIRA to refactor these micro 
> benchmarks and move to flink-benchmark module for central management.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-2344) Deprecate/Remove the old Pact Pair type

2016-04-19 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler resolved FLINK-2344.
-
Resolution: Fixed

This class was removed in FLINK-3169.

> Deprecate/Remove the old Pact Pair type
> ---
>
> Key: FLINK-2344
> URL: https://issues.apache.org/jira/browse/FLINK-2344
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2265) Tachyon Tests sometimes fail with pre-mature process exists

2016-04-19 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247554#comment-15247554
 ] 

Chesnay Schepler commented on FLINK-2265:
-

The failure causing test no longer exists, does it?

> Tachyon Tests sometimes fail with pre-mature process exists
> ---
>
> Key: FLINK-2265
> URL: https://issues.apache.org/jira/browse/FLINK-2265
> Project: Flink
>  Issue Type: Bug
>  Components: flink-contrib
>Affects Versions: 0.9
>Reporter: Stephan Ewen
>Priority: Minor
>
> Seems like Tachyon occasionally causes the JVM to exit/crash prematurely, 
> failing the tests.
> https://travis-ci.org/apache/flink/jobs/67863604
> Looks like a Tachyon issue at first glance.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1427) Configuration through environment variables

2016-04-19 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247549#comment-15247549
 ] 

Chesnay Schepler commented on FLINK-1427:
-

can we close this issue? ti appears that the consensus was to not implement 
this.

> Configuration through environment variables
> ---
>
> Key: FLINK-1427
> URL: https://issues.apache.org/jira/browse/FLINK-1427
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
> Environment: Deployment
>Reporter: Maximilian Michels
>Priority: Minor
>  Labels: configuration, deployment
>
> Like Hadoop or Spark, etc. Flink should support configuration via shell 
> environment variables. In cluster setups, this makes things a lot easier 
> because writing config files can be omitted. Many automation tools (e.g. 
> Google's bdutil) use (or abuse) this feature.
> For example, to set up the task manager heap size, we would run `export 
> FLINK_TASKMANAGER_HEAP=4096` before starting the task manager on a node to 
> set the heap memory size to 4096MB.
> Environment variables should overwrite the regular config entries.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3785) Add REST resource to query information about checkpointing

2016-04-19 Thread Konstantin Knauf (JIRA)
Konstantin Knauf created FLINK-3785:
---

 Summary: Add REST resource to query information about checkpointing
 Key: FLINK-3785
 URL: https://issues.apache.org/jira/browse/FLINK-3785
 Project: Flink
  Issue Type: New Feature
  Components: Web Client
Reporter: Konstantin Knauf


For Monitoring purposes it would be nice, to be able to query information about 
the last n checkpoint via the REST API, i.e ID, Trigger Time, state size and 
duration. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1394) TachyonFileSystemWrapperTest fails if the HADOOP_HOME variable is set

2016-04-19 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247548#comment-15247548
 ] 

Chesnay Schepler commented on FLINK-1394:
-

does this test still exist?

> TachyonFileSystemWrapperTest fails if the HADOOP_HOME variable is set
> -
>
> Key: FLINK-1394
> URL: https://issues.apache.org/jira/browse/FLINK-1394
> Project: Flink
>  Issue Type: Bug
>Reporter: Till Rohrmann
>Assignee: Robert Metzger
>
> The TachyonFileSystemWrapperTest fails if the HADOOP_HOME variable is set, 
> because the test case tries to connect to the hadoop server specified in the 
> config under HADOOP_HOME. If Hadoop is not running, then the client will 
> block the complete test case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1146) Add a way to provide types for generic type variables

2016-04-19 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247540#comment-15247540
 ] 

Chesnay Schepler commented on FLINK-1146:
-

was this issue resolved with the implementation of TypeHints?

> Add a way to provide types for generic type variables
> -
>
> Key: FLINK-1146
> URL: https://issues.apache.org/jira/browse/FLINK-1146
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 0.7.0-incubating
>Reporter: Stephan Ewen
>
> For cases where the type inference fails, we should be able to provide type 
> variable information manually.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3230) Kinesis streaming producer

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247514#comment-15247514
 ] 

ASF GitHub Bot commented on FLINK-3230:
---

Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1910#issuecomment-211847621
  
Great addition to have a producer to Kinesis. The changes look good 
overall, I had some minor inline comments.

The main concern is that we can only test this with a real Kinesis setup, 
which makes it fragile for future changes etc. Did you consider trying to mock 
the `KinesisProducer` to test that the expected calls are issued to it?

PS: The title of the JIRA issue and this PR is confusing (the last `... 
into Flink` part)


> Kinesis streaming producer
> --
>
> Key: FLINK-3230
> URL: https://issues.apache.org/jira/browse/FLINK-3230
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Robert Metzger
>
> Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will 
> be using AWS SDK implementation for code consistency with the 
> FlinkKinesisConsumer.
> The features of FlinkKinesisProducer is rather straightforward:
> 1. Partition put records based on partition key.
> 2. Configurable put mode: Bulk put for higher throughput vs. sequential 
> single record puts. Size of bulk should also be configurable.
> 3. For bulk put, user can also choose to enforce strict ordering of the 
> result with the tradeoff of higher put latency. Ref: 
> https://brandur.org/kinesis-order



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3230] Add producer for Amazon Kinesis S...

2016-04-19 Thread uce
Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1910#issuecomment-211847621
  
Great addition to have a producer to Kinesis. The changes look good 
overall, I had some minor inline comments.

The main concern is that we can only test this with a real Kinesis setup, 
which makes it fragile for future changes etc. Did you consider trying to mock 
the `KinesisProducer` to test that the expected calls are issued to it?

PS: The title of the JIRA issue and this PR is confusing (the last `... 
into Flink` part)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247512#comment-15247512
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1911#issuecomment-211847420
  
Cool. You don't need to resubmit a new PR. By pushing new commits to your 
`FLINK-3229` branch, the pull request will update automatically.


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3230) Kinesis streaming producer

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247511#comment-15247511
 ] 

ASF GitHub Bot commented on FLINK-3230:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60207331
  
--- Diff: flink-streaming-connectors/flink-connector-twitter/pom.xml ---
@@ -35,6 +35,11 @@ under the License.
 
jar
 
+   
+   2.2.0
--- End diff --

how is this change related to the Kinesis connector?


> Kinesis streaming producer
> --
>
> Key: FLINK-3230
> URL: https://issues.apache.org/jira/browse/FLINK-3230
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Robert Metzger
>
> Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will 
> be using AWS SDK implementation for code consistency with the 
> FlinkKinesisConsumer.
> The features of FlinkKinesisProducer is rather straightforward:
> 1. Partition put records based on partition key.
> 2. Configurable put mode: Bulk put for higher throughput vs. sequential 
> single record puts. Size of bulk should also be configurable.
> 3. For bulk put, user can also choose to enforce strict ordering of the 
> result with the tradeoff of higher put latency. Ref: 
> https://brandur.org/kinesis-order



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...

2016-04-19 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1911#issuecomment-211847420
  
Cool. You don't need to resubmit a new PR. By pushing new commits to your 
`FLINK-3229` branch, the pull request will update automatically.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3230] Add producer for Amazon Kinesis S...

2016-04-19 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60207331
  
--- Diff: flink-streaming-connectors/flink-connector-twitter/pom.xml ---
@@ -35,6 +35,11 @@ under the License.
 
jar
 
+   
+   2.2.0
--- End diff --

how is this change related to the Kinesis connector?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247509#comment-15247509
 ] 

ASF GitHub Bot commented on FLINK-3229:
---

Github user tzulitai commented on the pull request:

https://github.com/apache/flink/pull/1911#issuecomment-211847140
  
@rmetzger Sure, that seems reasonable. I'll wait until the producer is 
merged and resubmit a new PR for the integrated consumer.


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...

2016-04-19 Thread tzulitai
Github user tzulitai commented on the pull request:

https://github.com/apache/flink/pull/1911#issuecomment-211847140
  
@rmetzger Sure, that seems reasonable. I'll wait until the producer is 
merged and resubmit a new PR for the integrated consumer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3230) Kinesis streaming producer

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247508#comment-15247508
 ] 

ASF GitHub Bot commented on FLINK-3230:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60207207
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis.examples;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner;
+import 
org.apache.flink.streaming.connectors.kinesis.KinesisSerializationSchema;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This is an example on how to produce data into Kinesis
+ */
+public class ProduceIntoKinesis {
+
+   public static void main(String[] args) throws Exception {
+   ParameterTool pt = ParameterTool.fromArgs(args);
+
+   StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   see.setParallelism(1);
+
+   DataStream simpleStringStream = see.addSource(new 
EventsGenerator());
+
+   FlinkKinesisProducer kinesis = new 
FlinkKinesisProducer<>(pt.getRequired("region"),
+   pt.getRequired("accessKey"),
+   pt.getRequired("secretKey"), new 
SimpleStringSchema());
+
+   kinesis.setFailOnError(true);
+   kinesis.setDefaultStream("test-flink");
+   kinesis.setDefaultPartition("0");
+
+   simpleStringStream.addSink(kinesis);
+
+   see.execute();
+   }
+
+   public static class EventsGenerator implements SourceFunction {
+   private boolean running = true;
+
+   @Override
+   public void run(SourceContext ctx) throws Exception {
+   long seq = 0;
+   while(running) {
+
--- End diff --

this empty line seems odd.


> Kinesis streaming producer
> --
>
> Key: FLINK-3230
> URL: https://issues.apache.org/jira/browse/FLINK-3230
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Robert Metzger
>
> Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will 
> be using AWS SDK implementation for code consistency with the 
> FlinkKinesisConsumer.
> The features of FlinkKinesisProducer is rather straightforward:
> 1. Partition put records based on partition key.
> 2. Configurable put mode: Bulk put for higher throughput vs. sequential 
> single record puts. Size of bulk should also be configurable.
> 3. For bulk put, user can also choose to enforce strict ordering of the 
> result with the tradeoff of higher put latency. Ref: 
> https://brandur.org/kinesis-order



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3230] Add producer for Amazon Kinesis S...

2016-04-19 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60207207
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis.examples;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner;
+import 
org.apache.flink.streaming.connectors.kinesis.KinesisSerializationSchema;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This is an example on how to produce data into Kinesis
+ */
+public class ProduceIntoKinesis {
+
+   public static void main(String[] args) throws Exception {
+   ParameterTool pt = ParameterTool.fromArgs(args);
+
+   StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   see.setParallelism(1);
+
+   DataStream simpleStringStream = see.addSource(new 
EventsGenerator());
+
+   FlinkKinesisProducer kinesis = new 
FlinkKinesisProducer<>(pt.getRequired("region"),
+   pt.getRequired("accessKey"),
+   pt.getRequired("secretKey"), new 
SimpleStringSchema());
+
+   kinesis.setFailOnError(true);
+   kinesis.setDefaultStream("test-flink");
+   kinesis.setDefaultPartition("0");
+
+   simpleStringStream.addSink(kinesis);
+
+   see.execute();
+   }
+
+   public static class EventsGenerator implements SourceFunction {
+   private boolean running = true;
+
+   @Override
+   public void run(SourceContext ctx) throws Exception {
+   long seq = 0;
+   while(running) {
+
--- End diff --

this empty line seems odd.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3230] Add producer for Amazon Kinesis S...

2016-04-19 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1910#discussion_r60207039
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis;
+
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.internal.StaticCredentialsProvider;
+import com.amazonaws.services.kinesis.producer.Attempt;
+import com.amazonaws.services.kinesis.producer.KinesisProducer;
+import 
com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
+import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
+import com.amazonaws.services.kinesis.producer.UserRecordResult;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The FlinkKinesisProducer allows to produce from a Flink DataStream into 
Kinesis.
+ *
+ * @param  Data type to produce into Kinesis Streams
+ */
+public class FlinkKinesisProducer extends RichSinkFunction {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisProducer.class);
+
+   /* AWS region of the stream */
+   private final String region;
+
+   /* Access and secret key of the user */
+   private final String accessKey;
+   private final String secretKey;
+
+   /* Flag controlling the error behavior of the producer */
+   private boolean failOnError = false;
+
+   /* Name of the default stream to produce to. Can be overwritten by the 
serialization schema */
+   private String defaultStream;
+
+   /* Default partition id. Can be overwritten by the serialization schema 
*/
+   private String defaultPartition;
+
+   /* Schema for turning the OUT type into a byte array. */
+   private final KinesisSerializationSchema schema;
+
+   /* Optional custom partitioner */
+   private KinesisPartitioner customPartitioner = null;
+
+
+   // --- Runtime fields 
---
+
+
+   /* Our Kinesis instance for each parallel Flink sink */
+   private transient KinesisProducer producer;
+
+   /* Callback handling failures */
+   private transient FutureCallback callback;
+
+   /* Field for async exception */
+   private transient Throwable thrownException;
+
+
+   // --- Initialization and configuration  
---
+
+
+   /**
+* Create a new FlinkKinesisProducer.
+* This is a constructor supporting Flink's {@see SerializationSchema}.
+*
+* @param region AWS region of the stream
+* @param accessKey Access key of a user with permission to access the 
stream (ideally also with access to Cloud Watch)
+* @param secretKey Secret key of the user
+* @param schema Serialization schema for the data type
+*/
+   public FlinkKinesisProducer(String region, String accessKey, String 
secretKey, final SerializationSchema schema) {
+   // create a simple wrapper for the serialization schema
+   this(region, accessKey, secretKey, new 
KinesisSerializationSchema() {
+   @Override
+   public ByteBuffer serialize(OUT element) {
+   // wrap into ByteBuffer
+

  1   2   >