[GitHub] flink pull request #5171: [FLINK-8271][Kinesis connector] upgrade deprecated...

2018-01-02 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5171#discussion_r159362925
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
 ---
@@ -30,37 +30,44 @@
 import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
 import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
 import com.amazonaws.auth.profile.ProfileCredentialsProvider;
-import com.amazonaws.regions.Region;
+import com.amazonaws.client.builder.AwsClientBuilder;
 import com.amazonaws.regions.Regions;
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
 
 import java.util.Properties;
 
 /**
  * Some utilities specific to Amazon Web Service.
  */
 public class AWSUtil {
+   /** Used for formatting Flink-specific user agent string when creating 
Kinesis client. */
+   private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) 
Kinesis Connector";
 
/**
-* Creates an Amazon Kinesis Client.
+* Creates an AmazonKinesis client.
 * @param configProps configuration properties containing the access 
key, secret key, and region
-* @return a new Amazon Kinesis Client
+* @return a new AmazonKinesis client
 */
-   public static AmazonKinesisClient createKinesisClient(Properties 
configProps) {
+   public static AmazonKinesis createKinesisClient(Properties configProps) 
{
// set a Flink-specific user agent
-   ClientConfiguration awsClientConfig = new 
ClientConfigurationFactory().getConfig();
-   awsClientConfig.setUserAgent("Apache Flink " + 
EnvironmentInformation.getVersion() +
-   " (" + 
EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis 
Connector");
+   ClientConfiguration awsClientConfig = new 
ClientConfigurationFactory().getConfig()
+   
.withUserAgentPrefix(String.format(USER_AGENT_FORMAT,
--- End diff --

@tzulitai Check out [AWS source code 
here](https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-core/src/main/java/com/amazonaws/ClientConfiguration.java#L467),
 `setUserAgent` and `withUserAgentPrefix` are both calling `setUserAgentPrefix`


---


[jira] [Commented] (FLINK-8271) upgrade from deprecated classes to AmazonKinesis

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16309184#comment-16309184
 ] 

ASF GitHub Bot commented on FLINK-8271:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5171#discussion_r159363827
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
 ---
@@ -30,37 +30,44 @@
 import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
 import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
 import com.amazonaws.auth.profile.ProfileCredentialsProvider;
-import com.amazonaws.regions.Region;
+import com.amazonaws.client.builder.AwsClientBuilder;
 import com.amazonaws.regions.Regions;
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
 
 import java.util.Properties;
 
 /**
  * Some utilities specific to Amazon Web Service.
  */
 public class AWSUtil {
+   /** Used for formatting Flink-specific user agent string when creating 
Kinesis client. */
+   private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) 
Kinesis Connector";
 
/**
-* Creates an Amazon Kinesis Client.
+* Creates an AmazonKinesis client.
 * @param configProps configuration properties containing the access 
key, secret key, and region
-* @return a new Amazon Kinesis Client
+* @return a new AmazonKinesis client
 */
-   public static AmazonKinesisClient createKinesisClient(Properties 
configProps) {
+   public static AmazonKinesis createKinesisClient(Properties configProps) 
{
// set a Flink-specific user agent
-   ClientConfiguration awsClientConfig = new 
ClientConfigurationFactory().getConfig();
-   awsClientConfig.setUserAgent("Apache Flink " + 
EnvironmentInformation.getVersion() +
-   " (" + 
EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis 
Connector");
+   ClientConfiguration awsClientConfig = new 
ClientConfigurationFactory().getConfig()
+   
.withUserAgentPrefix(String.format(USER_AGENT_FORMAT,
+   
EnvironmentInformation.getVersion(),
+   

EnvironmentInformation.getRevisionInformation().commitId));
 
// utilize automatic refreshment of credentials by directly 
passing the AWSCredentialsProvider
-   AmazonKinesisClient client = new AmazonKinesisClient(
-   AWSUtil.getCredentialsProvider(configProps), 
awsClientConfig);
+   AmazonKinesisClientBuilder builder = 
AmazonKinesisClientBuilder.standard()
+   
.withCredentials(AWSUtil.getCredentialsProvider(configProps))
+   .withClientConfiguration(awsClientConfig)
+   
.withRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION)));
 
-   
client.setRegion(Region.getRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION;
if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {
-   
client.setEndpoint(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT));
+   builder.withEndpointConfiguration(new 
AwsClientBuilder.EndpointConfiguration(
+   

configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT),
+   

configProps.getProperty(AWSConfigConstants.AWS_REGION)));
--- End diff --

@tzulitai  You are right. After some research I found the `region` field in 
`EndpointConfiguration` 
([here](https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-core/src/main/java/com/amazonaws/client/builder/AwsClientBuilder.java#L363))
 is used the same as [old code 
here](https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-core/src/main/java/com/amazonaws/AmazonWebServiceClient.java#L368).
 I will set it to null.

Take it a further step, if what you described is a frequent use case, we 
should add a unit test for it for future validation. Would you like to create a 
ticket and a PR for it?


> upgrade from deprecated classes to AmazonKinesis
> 
>
> Key: FLINK-8271
>  

[jira] [Commented] (FLINK-8271) upgrade from deprecated classes to AmazonKinesis

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16309185#comment-16309185
 ] 

ASF GitHub Bot commented on FLINK-8271:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5171#discussion_r159362925
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
 ---
@@ -30,37 +30,44 @@
 import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
 import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
 import com.amazonaws.auth.profile.ProfileCredentialsProvider;
-import com.amazonaws.regions.Region;
+import com.amazonaws.client.builder.AwsClientBuilder;
 import com.amazonaws.regions.Regions;
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
 
 import java.util.Properties;
 
 /**
  * Some utilities specific to Amazon Web Service.
  */
 public class AWSUtil {
+   /** Used for formatting Flink-specific user agent string when creating 
Kinesis client. */
+   private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) 
Kinesis Connector";
 
/**
-* Creates an Amazon Kinesis Client.
+* Creates an AmazonKinesis client.
 * @param configProps configuration properties containing the access 
key, secret key, and region
-* @return a new Amazon Kinesis Client
+* @return a new AmazonKinesis client
 */
-   public static AmazonKinesisClient createKinesisClient(Properties 
configProps) {
+   public static AmazonKinesis createKinesisClient(Properties configProps) 
{
// set a Flink-specific user agent
-   ClientConfiguration awsClientConfig = new 
ClientConfigurationFactory().getConfig();
-   awsClientConfig.setUserAgent("Apache Flink " + 
EnvironmentInformation.getVersion() +
-   " (" + 
EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis 
Connector");
+   ClientConfiguration awsClientConfig = new 
ClientConfigurationFactory().getConfig()
+   
.withUserAgentPrefix(String.format(USER_AGENT_FORMAT,
--- End diff --

@tzulitai Check out [AWS source code 
here](https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-core/src/main/java/com/amazonaws/ClientConfiguration.java#L467),
 `setUserAgent` and `withUserAgentPrefix` are both calling `setUserAgentPrefix`


> upgrade from deprecated classes to AmazonKinesis
> 
>
> Key: FLINK-8271
> URL: https://issues.apache.org/jira/browse/FLINK-8271
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.5.0, 1.4.1
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5171: [FLINK-8271][Kinesis connector] upgrade deprecated...

2018-01-02 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5171#discussion_r159363827
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
 ---
@@ -30,37 +30,44 @@
 import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
 import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
 import com.amazonaws.auth.profile.ProfileCredentialsProvider;
-import com.amazonaws.regions.Region;
+import com.amazonaws.client.builder.AwsClientBuilder;
 import com.amazonaws.regions.Regions;
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
 
 import java.util.Properties;
 
 /**
  * Some utilities specific to Amazon Web Service.
  */
 public class AWSUtil {
+   /** Used for formatting Flink-specific user agent string when creating 
Kinesis client. */
+   private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) 
Kinesis Connector";
 
/**
-* Creates an Amazon Kinesis Client.
+* Creates an AmazonKinesis client.
 * @param configProps configuration properties containing the access 
key, secret key, and region
-* @return a new Amazon Kinesis Client
+* @return a new AmazonKinesis client
 */
-   public static AmazonKinesisClient createKinesisClient(Properties 
configProps) {
+   public static AmazonKinesis createKinesisClient(Properties configProps) 
{
// set a Flink-specific user agent
-   ClientConfiguration awsClientConfig = new 
ClientConfigurationFactory().getConfig();
-   awsClientConfig.setUserAgent("Apache Flink " + 
EnvironmentInformation.getVersion() +
-   " (" + 
EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis 
Connector");
+   ClientConfiguration awsClientConfig = new 
ClientConfigurationFactory().getConfig()
+   
.withUserAgentPrefix(String.format(USER_AGENT_FORMAT,
+   
EnvironmentInformation.getVersion(),
+   

EnvironmentInformation.getRevisionInformation().commitId));
 
// utilize automatic refreshment of credentials by directly 
passing the AWSCredentialsProvider
-   AmazonKinesisClient client = new AmazonKinesisClient(
-   AWSUtil.getCredentialsProvider(configProps), 
awsClientConfig);
+   AmazonKinesisClientBuilder builder = 
AmazonKinesisClientBuilder.standard()
+   
.withCredentials(AWSUtil.getCredentialsProvider(configProps))
+   .withClientConfiguration(awsClientConfig)
+   
.withRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION)));
 
-   
client.setRegion(Region.getRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION;
if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {
-   
client.setEndpoint(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT));
+   builder.withEndpointConfiguration(new 
AwsClientBuilder.EndpointConfiguration(
+   

configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT),
+   

configProps.getProperty(AWSConfigConstants.AWS_REGION)));
--- End diff --

@tzulitai  You are right. After some research I found the `region` field in 
`EndpointConfiguration` 
([here](https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-core/src/main/java/com/amazonaws/client/builder/AwsClientBuilder.java#L363))
 is used the same as [old code 
here](https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-core/src/main/java/com/amazonaws/AmazonWebServiceClient.java#L368).
 I will set it to null.

Take it a further step, if what you described is a frequent use case, we 
should add a unit test for it for future validation. Would you like to create a 
ticket and a PR for it?


---


[jira] [Commented] (FLINK-8175) remove flink-streaming-contrib and migrate its classes to flink-streaming-java/scala

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16309169#comment-16309169
 ] 

ASF GitHub Bot commented on FLINK-8175:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5112
  
@StephanEwen let me know if you have more concerns


> remove flink-streaming-contrib and migrate its classes to 
> flink-streaming-java/scala
> 
>
> Key: FLINK-8175
> URL: https://issues.apache.org/jira/browse/FLINK-8175
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.5.0
>
>
> I propose removing flink-streaming-contrib from flink-contrib, and migrating 
> its classes to flink-streaming-java/scala for the following reasons:
> - flink-streaming-contrib is so small that it only has 4 classes (3 java and 
> 1 scala), and they don't need a dedicated jar for Flink to distribute and 
> maintain it and for users to deal with the overhead of dependency management
> - the 4 classes in flink-streaming-contrib are logically more tied to 
> flink-streaming-java/scala, and thus can be easily migrated
> - flink-contrib is already too crowded and noisy. It contains lots of sub 
> modules with different purposes which confuse developers and users, and they 
> lack a proper project hierarchy
> To take a step even forward, I would argue that even flink-contrib should be 
> removed and all its submodules should be migrated to other top-level modules 
> for the following reasons: 1) Apache Flink the whole project itself is a 
> result of contributions from many developers, there's no reason to highlight 
> some contributions in a dedicated module named 'contrib' 2) flink-contrib 
> inherently doesn't have a good hierarchy to hold submodules



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8175) remove flink-streaming-contrib and migrate its classes to flink-streaming-java/scala

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16309170#comment-16309170
 ] 

ASF GitHub Bot commented on FLINK-8175:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5112
  
@StephanEwen @tillrohrmann  let me know if you have more concerns


> remove flink-streaming-contrib and migrate its classes to 
> flink-streaming-java/scala
> 
>
> Key: FLINK-8175
> URL: https://issues.apache.org/jira/browse/FLINK-8175
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.5.0
>
>
> I propose removing flink-streaming-contrib from flink-contrib, and migrating 
> its classes to flink-streaming-java/scala for the following reasons:
> - flink-streaming-contrib is so small that it only has 4 classes (3 java and 
> 1 scala), and they don't need a dedicated jar for Flink to distribute and 
> maintain it and for users to deal with the overhead of dependency management
> - the 4 classes in flink-streaming-contrib are logically more tied to 
> flink-streaming-java/scala, and thus can be easily migrated
> - flink-contrib is already too crowded and noisy. It contains lots of sub 
> modules with different purposes which confuse developers and users, and they 
> lack a proper project hierarchy
> To take a step even forward, I would argue that even flink-contrib should be 
> removed and all its submodules should be migrated to other top-level modules 
> for the following reasons: 1) Apache Flink the whole project itself is a 
> result of contributions from many developers, there's no reason to highlight 
> some contributions in a dedicated module named 'contrib' 2) flink-contrib 
> inherently doesn't have a good hierarchy to hold submodules



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5112: [FLINK-8175] [DataStream API java/scala] remove flink-str...

2018-01-02 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5112
  
@StephanEwen @tillrohrmann  let me know if you have more concerns


---


[jira] [Commented] (FLINK-8217) Properly annotate APIs of flink-connector-kinesis

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16309168#comment-16309168
 ] 

ASF GitHub Bot commented on FLINK-8217:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5138
  
@tzulitai  The PR has been updated


> Properly annotate APIs of flink-connector-kinesis
> -
>
> Key: FLINK-8217
> URL: https://issues.apache.org/jira/browse/FLINK-8217
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5112: [FLINK-8175] [DataStream API java/scala] remove flink-str...

2018-01-02 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5112
  
@StephanEwen let me know if you have more concerns


---


[jira] [Commented] (FLINK-7475) support update() in ListState

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16309167#comment-16309167
 ] 

ASF GitHub Bot commented on FLINK-7475:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4963
  
@yunfan123 @aljoscha @StefanRRichter feedbacks are appreciated


> support update() in ListState
> -
>
> Key: FLINK-7475
> URL: https://issues.apache.org/jira/browse/FLINK-7475
> Project: Flink
>  Issue Type: Improvement
>  Components: Core, DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: yf
>Assignee: Bowen Li
> Fix For: 1.5.0
>
>
> If I want to update the list. 
> I have to do two steps: 
> listState.clear() 
> for (Element e : myList) { 
> listState.add(e); 
> } 
> Why not I update the state by: 
> listState.update(myList) ?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5138: [FLINK-8217] [Kinesis connector] Properly annotate APIs o...

2018-01-02 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5138
  
@tzulitai  The PR has been updated


---


[GitHub] flink issue #4963: [FLINK-7475] [core][DataStream API] support update() in L...

2018-01-02 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4963
  
@yunfan123 @aljoscha @StefanRRichter feedbacks are appreciated


---


[jira] [Commented] (FLINK-7935) Metrics with user supplied scope variables

2018-01-02 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16309048#comment-16309048
 ] 

Wei-Che Wei commented on FLINK-7935:


Hi [~elevy]
Since FLINK-7692 was solved, user can use 
{{AbstractMetricGroup#getLogicalScope(CharacterFilter)}} and 
{{MetricGroup#getAllVariables()}} to get the same name but with different tags 
on user-defined variables. Did it solve what you need in this issue or is there 
any improvement that we can make on this issue? Thank you.

> Metrics with user supplied scope variables
> --
>
> Key: FLINK-7935
> URL: https://issues.apache.org/jira/browse/FLINK-7935
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.3.2
>Reporter: Elias Levy
>
> We use DataDog for metrics.  DD and Flink differ somewhat in how they track 
> metrics.
> Flink names and scopes metrics together, at least by default. E.g. by default 
>  the System scope for operator metrics is 
> {{.taskmanager}}.  
> The scope variables become part of the metric's full name.
> In DD the metric would be named something generic, e.g. 
> {{taskmanager.job.operator}}, and they would be distinguished by their tag 
> values, e.g. {{tm_id=foo}}, {{job_name=var}}, {{operator_name=baz}}.
> Flink allows you to configure the format string for system scopes, so it is 
> possible to set the operator scope format to {{taskmanager.job.operator}}.  
> We do this for all scopes:
> {code}
> metrics.scope.jm: jobmanager
> metrics.scope.jm.job: jobmanager.job
> metrics.scope.tm: taskmanager
> metrics.scope.tm.job: taskmanager.job
> metrics.scope.task: taskmanager.job.task
> metrics.scope.operator: taskmanager.job.operator
> {code}
> This seems to work.  The DataDog Flink metric's plugin submits all scope 
> variables as tags, even if they are not used within the scope format.  And it 
> appears internally this does not lead to metrics conflicting with each other.
> We would like to extend this to user defined metrics, but you can define 
> variables/scopes when adding a metric group or metric with the user API, so 
> that in DD we have a single metric with a tag with many different values, 
> rather than hundreds of metrics to just the one value we want to measure 
> across different event types.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8352) Flink UI Reports No Error on Job Submission Failures

2018-01-02 Thread Elias Levy (JIRA)
Elias Levy created FLINK-8352:
-

 Summary: Flink UI Reports No Error on Job Submission Failures
 Key: FLINK-8352
 URL: https://issues.apache.org/jira/browse/FLINK-8352
 Project: Flink
  Issue Type: Bug
  Components: Web Client
Affects Versions: 1.4.0
Reporter: Elias Levy


If you submit a job jar via the web UI and it raises an exception when started, 
the UI will report no error and will continue the show the animated image that 
makes it seem as if it is working.  In addition, no error is printed in the 
logs, unless the level is increased to at least DEBUG:

{noformat}
@40005a4c399202b87ebc DEBUG 
org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler - Error while 
handling request.
@40005a4c399202b8868c java.util.concurrent.CompletionException: 
org.apache.flink.client.program.ProgramInvocationException: The program caused 
an error: 
@40005a4c399202b88a74   at 
org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler.lambda$handleJsonRequest$0(JarPlanHandler.java:68)
@40005a4c399202b88e5c   at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
@40005a4c399202b8e44c   at 
java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
@40005a4c399202b8e44c   at java.util.concurrent.FutureTask.run(Unknown 
Source)
@40005a4c399202b8e834   at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown
 Source)
@40005a4c399202b8e834   at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
 Source)
@40005a4c399202b8f3ec   at 
java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
@40005a4c399202b8f7d4   at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
@40005a4c399202b8f7d4   at java.lang.Thread.run(Unknown Source)
@40005a4c399202b8fbbc Caused by: 
org.apache.flink.client.program.ProgramInvocationException: The program caused 
an error: 
@40005a4c399202b90b5c   at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93)
@40005a4c399202b90f44   at 
org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:334)
@40005a4c399202b90f44   at 
org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:76)
@40005a4c399202b91afc   at 
org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler.lambda$handleJsonRequest$0(JarPlanHandler.java:57)
@40005a4c399202b91afc   ... 8 more
@40005a4c399202b91ee4 Caused by: java.lang.ExceptionInInitializerError
@40005a4c399202b91ee4   at 
com.cisco.sbg.amp.flink.ioc_engine.IocEngine.main(IocEngine.scala)
@40005a4c399202b922cc   at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
@40005a4c399202b92a9c   at 
sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
@40005a4c399202b92a9c   at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
@40005a4c399202b92e84   at java.lang.reflect.Method.invoke(Unknown 
Source)
@40005a4c399202b92e84   at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
@40005a4c399202b9326c   at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
@40005a4c399202b93a3c   at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
@40005a4c399202b949dc   ... 11 more
@40005a4c399202b949dc Caused by: java.io.FileNotFoundException: 
/data/jenkins/jobs/XXX/workspace/target/scala-2.11/scoverage-data/scoverage.measurements.55
 (No such file or directory)
@40005a4c399202b951ac   at java.io.FileOutputStream.open0(Native Method)
@40005a4c399202b951ac   at java.io.FileOutputStream.open(Unknown Source)
@40005a4c399202b9597c   at java.io.FileOutputStream.(Unknown 
Source)
@40005a4c399202b9597c   at java.io.FileWriter.(Unknown Source)
@40005a4c399202b95d64   at 
scoverage.Invoker$$anonfun$1.apply(Invoker.scala:42)
@40005a4c399202b95d64   at 
scoverage.Invoker$$anonfun$1.apply(Invoker.scala:42)
@40005a4c399202b9614c   at 
scala.collection.concurrent.TrieMap.getOrElseUpdate(TrieMap.scala:901)
@40005a4c399202b9614c   at scoverage.Invoker$.invoked(Invoker.scala:42)
@40005a4c399202b9691c   at com.XXX$.(IocEngine.scala:28)
@40005a4c399202b9691c   at com.XXX$.(IocEngine.scala)
{noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8351) Canonical way to extract FoldingState QueryableStateStream

2018-01-02 Thread Raymond Tay (JIRA)
Raymond Tay created FLINK-8351:
--

 Summary: Canonical way to extract FoldingState QueryableStateStream
 Key: FLINK-8351
 URL: https://issues.apache.org/jira/browse/FLINK-8351
 Project: Flink
  Issue Type: Bug
  Components: Queryable State
Affects Versions: 1.4.0
 Environment: h3. Environment
* Local flink cluster version 1.4.0
* {{classloader.resolve-order: child-first}} in {{conf/flink-conf.yaml}}.
* scala 2.11.11
* oracle jdk 1.8.0

h3. Library version
* akka actors 2.4.20
* akka http 10.0.10
Reporter: Raymond Tay


I started experimenting with {{QueryableStateStream}} by referencing this site 
which is linked by Flink's website ⇒ 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/queryable_state.html
 

I think the example given is perfectly fine and runs w/o much issue. However, i 
started extrapolating the situation to have a {{FoldingState}} where the 
application continuously updates the state and another part of the application 
retrieves the state and i ran into the following situation:

* Is there a canonical way or API that allows the app to extract the submitted 
job-id in order to query the current state 
** As a work around, i'm querying the url {{localhost:8081/jobs}} but this does 
not work when there are other jobs running
* Is there another way to query the state other than {{polling}} ? Is it 
possible to to return the final result as part of the {{JobExecutionResult}}
** At the moment, the app is polling the url {{localhost:8081/jobs}} and using 
that {{JobID}} to query the state but when the job completes, the job's state 
is not persisted and i lose the _final_ state value

I realized {{QueryableStateStream}} is in {{beta}} as of the time of this issue 
so there are things i might have missed. Any help or tip is greatly appreciated 
☺



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8327) ClassLoader resolution of child-first does not appear to work

2018-01-02 Thread Raymond Tay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308983#comment-16308983
 ] 

Raymond Tay commented on FLINK-8327:


Yes, the fat-jar does contain Akka classes and like yourself, i naturally 
assumed that the configuration file is not present but it turns out it was 
present (i could run it via {{run}} while in {{sbt}} just to print out that 
settings belonging to {{akka.http}}) and hence i came to the conclusion that it 
_might_ be a class-loading problem. I'll need to investigate how Akka 
classloading works in this situation and i'll come back a little later this 
week.

Thanks for that suggestion, i'll try that suggestion out and see how things go 
and i'll come back to this issue later today.

Sbt is complicated in the manner that there're multiple classloaders and i dont 
know the full story about how they work cohesively. 

> ClassLoader resolution of child-first does not appear to work
> -
>
> Key: FLINK-8327
> URL: https://issues.apache.org/jira/browse/FLINK-8327
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
> Environment: h3. Environment
> * Local flink cluster version 1.4.0
> * {{classloader.resolve-order: child-first}} in {{conf/flink-conf.yaml}}.
> * scala 2.11.11
> * oracle jdk 1.8.0
> h3. Library version
> * akka actors 2.4.20
> * akka http 10.0.10
>Reporter: Raymond Tay
>
> h2. Description
> Was trying out the {{Queryable State}} and ran into a problem where the 
> submitted job starts regular Akka actors and making external HTTP calls via 
> {{akka-http}} libraries and the flink runtime was complaining that it was not 
> able to read the key {{akka.http}} (this key is held in the configuration 
> file for {{akka-http}}).
> When i ran our app on the {{sbt}} shell locally, it was able to see the key 
> {{akka.http}} but when we submitted the fatjar (via {{sbt-assembly}}) to 
> flink, it was throwing the error message (see below). Looks like a class 
> loader issue but i'm not sure.
> Any help is much appreciated !
> h2. Error message
> {noformat}
> Caused by: com.typesafe.config.ConfigException$Missing: No configuration 
> setting found for key 'akka.http'
>   at 
> com.typesafe.config.impl.SimpleConfig.findKeyOrNull(SimpleConfig.java:152)
>   at 
> com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:170)
>   at 
> com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:176)
>   at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:184)
>   at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:189)
>   at 
> com.typesafe.config.impl.SimpleConfig.getObject(SimpleConfig.java:258)
>   at 
> com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:264)
>   at com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:37)
>   at akka.http.scaladsl.Http$.createExtension(Http.scala:843)
>   at akka.http.scaladsl.Http$.createExtension(Http.scala:719)
>   at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:917)
>   at akka.actor.ExtensionId$class.apply(Extension.scala:79)
>   at akka.http.scaladsl.Http$.apply(Http.scala:838)
>   at akka.http.scaladsl.Http$.apply(Http.scala:719)
>   at org.example.state.A.(Simple.scala:158)
>   ... 18 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8322) support getting number of existing timers in TimerService

2018-01-02 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-8322:

Fix Version/s: 1.5.0

> support getting number of existing timers in TimerService
> -
>
> Key: FLINK-8322
> URL: https://issues.apache.org/jira/browse/FLINK-8322
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.5.0
>
>
> We have the use cases where we want to use timers as scheduled threads - e.g. 
> add a timer to wake up x hours later and do something (reap old data usualy) 
> only if there's no existing timers, basically we only want at most 1 timer 
> exists for the key all the time



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8265) Missing jackson dependency for flink-mesos

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308673#comment-16308673
 ] 

ASF GitHub Bot commented on FLINK-8265:
---

Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/5208
  
@aljoscha please review


> Missing jackson dependency for flink-mesos
> --
>
> Key: FLINK-8265
> URL: https://issues.apache.org/jira/browse/FLINK-8265
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 1.4.0
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> The Jackson library that is required by Fenzo is missing from the Flink 
> distribution jar-file.
> This manifests as an exception in certain circumstances when a hard 
> constraint is configured ("mesos.constraints.hard.hostattribute").
> {code}
> NoClassDefFoundError: 
> org/apache/flink/mesos/shaded/com/fasterxml/jackson/databind/ObjectMapper
> at com.netflix.fenzo.ConstraintFailure.(ConstraintFailure.java:35)
> at 
> com.netflix.fenzo.AssignableVirtualMachine.findFailedHardConstraints(AssignableVirtualMachine.java:784)
> at 
> com.netflix.fenzo.AssignableVirtualMachine.tryRequest(AssignableVirtualMachine.java:581)
> at com.netflix.fenzo.TaskScheduler.evalAssignments(TaskScheduler.java:796)
> at com.netflix.fenzo.TaskScheduler.access$1500(TaskScheduler.java:70)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5208: [FLINK-8265] [Mesos] Missing jackson dependency for flink...

2018-01-02 Thread EronWright
Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/5208
  
@aljoscha please review


---


[jira] [Commented] (FLINK-8314) Add support for Kinesis Firehose

2018-01-02 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308524#comment-16308524
 ] 

Bowen Li commented on FLINK-8314:
-

[~aljoscha] I believe this is unnecessary.

First, Firehose (FH) is just a wrapper with Kinesis inside, to better integrate 
with AWS eg. S3. Second, users can create a FH and set Kinesis as source, and 
Kinesis's data will automatically goes into FH. Based on the above two reasons, 
I believe there's no need to create an extra FH connector and maintain it in 
Flink. 

> Add support for Kinesis Firehose
> 
>
> Key: FLINK-8314
> URL: https://issues.apache.org/jira/browse/FLINK-8314
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Ivan Mushketyk
>Priority: Minor
>
> [Kinesis 
> Firehose|https://docs.aws.amazon.com/firehose/latest/dev/what-is-this-service.html]
>  is another product in Kinesis Family that allows zero-ops scalable stream 
> for delivering streaming data to S3, RedShift, Elasticsearch, etc. It has a 
> different 
> [API|http://docs.aws.amazon.com/firehose/latest/APIReference/Welcome.html].
> I can contribute this myself, but I wonder if this feature makes sense to you.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8305) Docker port mapping doesn't work for JobManager RPC port on Official Flink image

2018-01-02 Thread Thomas Wozniakowski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308452#comment-16308452
 ] 

Thomas Wozniakowski commented on FLINK-8305:


Hi Aljoscha,

Thanks for responding! I actually ended up working this out by banging my head 
against it for another day or so. You're right, using 1:1 port mappings does 
work, but the Docker images don't allow a deterministic way to override most 
internal port configuration in Local Cluster mode, so I've raised a Github PR 
to allow this:

https://github.com/docker-flink/docker-flink/pull/33



> Docker port mapping doesn't work for JobManager RPC port on Official Flink 
> image
> 
>
> Key: FLINK-8305
> URL: https://issues.apache.org/jira/browse/FLINK-8305
> Project: Flink
>  Issue Type: Bug
>  Components: Docker
>Affects Versions: 1.4.0, 1.3.2
> Environment: Mac OSX, Docker 17.09.1-ce-mac42 (21090)
>Reporter: Thomas Wozniakowski
>
> With the images at: https://hub.docker.com/r/_/flink/. The JobManager RPC 
> port does not appear to be properly exposed via a standard docker port 
> mapping.
> Starting a local cluster with:
> {{docker run  --name flink_local -p 32789:6123 -t -d flink local}}
> gives you: 
> ||CONTAINER ID||IMAGE||COMMAND||CREATED||STATUS||PORTS||NAMES||
> |e2dd5f668f4f|flink|"/docker-entrypoin..."|6 minutes ago|Up 6 
> minutes|8081/tcp, 0.0.0.0:32789->6123/tcp|flink_local|
> Should allow submission of a job via command line with:
> {{flink run -m 0.0.0.0:32789 
> /usr/local/Cellar/apache-flink/1.3.2/libexec/examples/streaming/WordCount.jar}}
> But this fails with "Couldn't retrieve the JobExecutionResult from the 
> JobManager".
> Logging directly into the container with:
> {{sudo docker exec -it  /bin/bash}}
> allows you to successfully start and complete job with:
> {{flink run -m localhost:6123 /opt/flink/examples/streaming/WordCount.jar}}
> What am I missing here? If the job can be successfully submitted to 6123 
> locally, and that port is mapped to an external port on 0.0.0.0, then I 
> should be able to submit jobs to that port? I can't find any way to make this 
> work though. I've tried all variants of host name (localhost, 127.0.0.1, 
> 0.0.0.0) and none work.
> I would hugely appreciate any help.
> Thanks guys.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8279) Use Mesos/YARN temp directories as fallback for BlobServer/Cache temp directories

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308448#comment-16308448
 ] 

ASF GitHub Bot commented on FLINK-8279:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/5176
  
For the change regarding the JobManager use of a TaskManager configuration 
parameter, I created a separate issue and included the appropriate commits here 
(FLINK-8250) as this commit probably does not make too much sense without them. 
The commits contain some high-level description about the changes.


> Use Mesos/YARN temp directories as fallback for BlobServer/Cache temp 
> directories
> -
>
> Key: FLINK-8279
> URL: https://issues.apache.org/jira/browse/FLINK-8279
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> Currently, the BLOB server and cache processes (temporarily) stash incoming 
> files into their local file system in the directory given by the 
> {{blob.storage.directory}} configuration property. If this property is not 
> set or empty, it will fall back to {{java.io.tmpdir}}.
> Instead, in a Mesos/YARN environment, we could use the temporary directories 
> they assigned to the Flink job which are not only the proper folder to use, 
> but may also offer some more space.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5176: [FLINK-8279][blob] fall back to TaskManager temp director...

2018-01-02 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/5176
  
For the change regarding the JobManager use of a TaskManager configuration 
parameter, I created a separate issue and included the appropriate commits here 
(FLINK-8250) as this commit probably does not make too much sense without them. 
The commits contain some high-level description about the changes.


---


[jira] [Created] (FLINK-8350) replace "taskmanager.tmp.dirs" with "env.io.tmp.dirs" for all components

2018-01-02 Thread Nico Kruber (JIRA)
Nico Kruber created FLINK-8350:
--

 Summary: replace "taskmanager.tmp.dirs" with "env.io.tmp.dirs" for 
all components
 Key: FLINK-8350
 URL: https://issues.apache.org/jira/browse/FLINK-8350
 Project: Flink
  Issue Type: Improvement
  Components: Cluster Management, Configuration
Affects Versions: 1.4.0
Reporter: Nico Kruber
Assignee: Nico Kruber


Currently, there is only a {{taskmanager.tmp.dirs}} configuration parameter 
which (if unset) is set to YARN/Mesos' application environment paths (the 
latter not quite yet). With FLINK-8279, we also used this as a fall-back for 
the BLOB caches and would like to use it for the BLOB server as well. This, 
however, does not reside on the TaskManager and it only makes sense to have a 
single temporary directory configuration parameter (if desired, this could be 
extended).

I propose to change this to a more generic {{env.io.tmp.dirs}} used by all 
components, i.e. JobManager, JobMaster, Dispatcher, and all the 
TaskManager-related instances for both YARN and Mesos.

TODO: set this value to the appropriate folders for the JobManager code 
paths
during cluster deployment (this exists for the TaskManager only for now)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-8334) Elasticsearch Connector throwing java.lang.ClassNotFoundException: org.jboss.netty.channel.socket.nio.SocketSendBufferPool$GatheringSendBuffer

2018-01-02 Thread Bhaskar Divya (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308330#comment-16308330
 ] 

Bhaskar Divya edited comment on FLINK-8334 at 1/2/18 4:37 PM:
--

Thanks [~aljoscha].

I think I got it working now.(no more failures on start of the Job) 
My POM was messed up trying to fix this issue. So, I re-created a fresh POM 
from the maven archetype of 1.4.0. I put the dependencies I required one by one.
Also, There were issues with the Elasticsearch docker environment.
For anybody looking at this, Please go through [this 
link|https://stackoverflow.com/questions/41192680/update-max-map-count-for-elasticsearch-docker-container-mac-host#41251595]
 to set vm.max_map_count
And, set the following environment variables in docker container as below :
http.host  = 0.0.0.0
transport.host = 0.0.0.0
xpack.security.enabled = false

Also, The POM which is working for me is 
1) Add dependency in the outer dependencies block 
{noformat}

org.apache.flink

flink-connector-elasticsearch5_${scala.binary.version}
${flink.version}

{noformat}

2) Follow the guidelines from 
[here|https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/linking.html].
 Add the provided  configuration. Inside the  I added 
the following for elasticsearch

{noformat}


org.apache.flink

flink-connector-elasticsearch5_${scala.binary.version}
1.4.0
jar
false
${project.build.directory}/classes
org/apache/flink/**



org.elasticsearch
elasticsearch
5.1.2
jar
false
${project.build.directory}/classes
**



org.elasticsearch.plugin
transport-netty3-client
5.1.2
jar
false
${project.build.directory}/classes
**



io.netty
netty
3.10.6.Final
jar
false
${project.build.directory}/classes
**


{noformat}

Few of them are definitely extras and maybe not actually required.
Hope it helps anyone trying the Elasticsearch connector with Docker.


was (Author: bhaskardivya):
Thanks [~aljoscha].

I think I got it working now.(no more failures on start of the Job) 
My POM was messed up trying to fix this issue. So, I re-created a fresh POM 
from the maven archetype of 1.4.0. I put the dependencies I required one by one.
Also, There were issues with the Elasticsearch docker environment.
For anybody looking at this, Please go through [this 
link|https://stackoverflow.com/questions/41192680/update-max-map-count-for-elasticsearch-docker-container-mac-host#41251595]
 to set vm.max_map_count
And, set the following environment variables in docker container as below :
http.host  = 0.0.0.0
transport.host = 0.0.0.0
xpack.security.enabled = false

Also, The POM which is working for me is 
1) Add dependency in the outer dependencies block 
{noformat}

org.apache.flink

flink-connector-elasticsearch5_${scala.binary.version}
${flink.version}

{noformat}

2) Follow the guidelines from 
[here|https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/linking.html].
 Add the provided  configuration. Inside the  I added 
the following for elasticsearch

{noformat}


org.apache.flink

flink-connector-elasticsearch5_${scala.binary.version}
1.4.0
jar
false

${project.build.directory}/classes

org/apache/flink/**



org.elasticsearch

elasticsearch
5.1.2
jar
false

${project.build.directory}/classes
**




org.elasticsearch.plugin

transport-netty3-client
5.1.2
jar
false

${project.build.directory}/classes
**



io.netty
netty
3.10.6.Final

[jira] [Commented] (FLINK-8334) Elasticsearch Connector throwing java.lang.ClassNotFoundException: org.jboss.netty.channel.socket.nio.SocketSendBufferPool$GatheringSendBuffer

2018-01-02 Thread Bhaskar Divya (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308330#comment-16308330
 ] 

Bhaskar Divya commented on FLINK-8334:
--

Thanks [~aljoscha].

I think I got it working now.(no more failures on start of the Job) 
My POM was messed up trying to fix this issue. So, I re-created a fresh POM 
from the maven archetype of 1.4.0. I put the dependencies I required one by one.
Also, There were issues with the Elasticsearch docker environment.
For anybody looking at this, Please go through [this 
link|https://stackoverflow.com/questions/41192680/update-max-map-count-for-elasticsearch-docker-container-mac-host#41251595]
 to set vm.max_map_count
And, set the following environment variables in docker container as below :
http.host  = 0.0.0.0
transport.host = 0.0.0.0
xpack.security.enabled = false

Also, The POM which is working for me is 
1) Add dependency in the outer dependencies block 
{noformat}

org.apache.flink

flink-connector-elasticsearch5_${scala.binary.version}
${flink.version}

{noformat}

2) Follow the guidelines from 
[here|https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/linking.html].
 Add the provided  configuration. Inside the  I added 
the following for elasticsearch

{noformat}


org.apache.flink

flink-connector-elasticsearch5_${scala.binary.version}
1.4.0
jar
false

${project.build.directory}/classes

org/apache/flink/**



org.elasticsearch

elasticsearch
5.1.2
jar
false

${project.build.directory}/classes
**




org.elasticsearch.plugin

transport-netty3-client
5.1.2
jar
false

${project.build.directory}/classes
**



io.netty
netty
3.10.6.Final
jar
false

${project.build.directory}/classes
**


{noformat}

Few of them are definitely extras and maybe not actually required.
Hope it helps anyone trying the Elasticsearch connector with Docker.

> Elasticsearch Connector throwing java.lang.ClassNotFoundException: 
> org.jboss.netty.channel.socket.nio.SocketSendBufferPool$GatheringSendBuffer
> --
>
> Key: FLINK-8334
> URL: https://issues.apache.org/jira/browse/FLINK-8334
> Project: Flink
>  Issue Type: Bug
>  Components: ElasticSearch Connector
>Affects Versions: 1.4.0
> Environment: Using Elasticsearch 5.1.2 in a docker environment
> Flink is deployed on a different docker
>Reporter: Bhaskar Divya
>  Labels: elasticsearch, netty
>
> I have a Elasticsearch sink configured. When a job is submitted, It goes into 
> fail status in a few seconds. 
> Following is the Exception from the Job screen:
> {code:java}
> java.lang.RuntimeException: Elasticsearch client is not connected to any 
> Elasticsearch nodes!
>   at 
> org.apache.flink.streaming.connectors.elasticsearch5.Elasticsearch5ApiCallBridge.createClient(Elasticsearch5ApiCallBridge.java:80)
>   at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:281)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> 

[jira] [Commented] (FLINK-8265) Missing jackson dependency for flink-mesos

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308328#comment-16308328
 ] 

ASF GitHub Bot commented on FLINK-8265:
---

Github user jaredstehler commented on the issue:

https://github.com/apache/flink/pull/5208
  
looks good to me, thanks!


> Missing jackson dependency for flink-mesos
> --
>
> Key: FLINK-8265
> URL: https://issues.apache.org/jira/browse/FLINK-8265
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 1.4.0
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> The Jackson library that is required by Fenzo is missing from the Flink 
> distribution jar-file.
> This manifests as an exception in certain circumstances when a hard 
> constraint is configured ("mesos.constraints.hard.hostattribute").
> {code}
> NoClassDefFoundError: 
> org/apache/flink/mesos/shaded/com/fasterxml/jackson/databind/ObjectMapper
> at com.netflix.fenzo.ConstraintFailure.(ConstraintFailure.java:35)
> at 
> com.netflix.fenzo.AssignableVirtualMachine.findFailedHardConstraints(AssignableVirtualMachine.java:784)
> at 
> com.netflix.fenzo.AssignableVirtualMachine.tryRequest(AssignableVirtualMachine.java:581)
> at com.netflix.fenzo.TaskScheduler.evalAssignments(TaskScheduler.java:796)
> at com.netflix.fenzo.TaskScheduler.access$1500(TaskScheduler.java:70)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5208: [FLINK-8265] [Mesos] Missing jackson dependency for flink...

2018-01-02 Thread jaredstehler
Github user jaredstehler commented on the issue:

https://github.com/apache/flink/pull/5208
  
looks good to me, thanks!


---


[jira] [Commented] (FLINK-6206) As an Engineer, I want task state transition log to be warn/error for FAILURE scenarios

2018-01-02 Thread Dan Bress (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308303#comment-16308303
 ] 

Dan Bress commented on FLINK-6206:
--

This is definitely relevant in 1.2.0, without this is very hard to detect 
situations when the system restarts due to an exception happening.

> As an Engineer, I want task state transition log to be warn/error for FAILURE 
> scenarios
> ---
>
> Key: FLINK-6206
> URL: https://issues.apache.org/jira/browse/FLINK-6206
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Dan Bress
>Priority: Critical
>
> If a task fails due to an exception, I would like that to be logged at a warn 
> or an error level.  currently its info
> {code}
> private boolean transitionState(ExecutionState currentState, ExecutionState 
> newState, Throwable cause) {
>   if (STATE_UPDATER.compareAndSet(this, currentState, newState)) {
>   if (cause == null) {
>   LOG.info("{} ({}) switched from {} to {}.", 
> taskNameWithSubtask, executionId, currentState, newState);
>   } else {
>   LOG.info("{} ({}) switched from {} to {}.", 
> taskNameWithSubtask, executionId, currentState, newState, cause);
>   }
>   return true;
>   } else {
>   return false;
>   }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-8272) 1.4 & Scala: Exception when querying the state

2018-01-02 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-8272.
---
   Resolution: Not A Problem
Fix Version/s: (was: 1.4.0)

> 1.4 & Scala: Exception when querying the state
> --
>
> Key: FLINK-8272
> URL: https://issues.apache.org/jira/browse/FLINK-8272
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
> Environment: mac
>Reporter: Juan Miguel Cejuela
>
> With Flink 1.3.2 and basically the same code except for the changes in the 
> API that happened in 1.4.0, I could query the state finely.
> With 1.4.0 (& scala), I get the exception written below. Most important line: 
> {{java.util.concurrent.CompletionException: 
> java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds 
> writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0)}}
> Somehow it seems that the connection to the underlying buffer fails. Perhaps 
> most informing is that, if I give a wrong {{JobId}} (i.e. one that is not 
> running), I still get the same error. So I guess no connection succeeds. 
> Also, to avoid possible serialization errors, I am just querying with a key 
> of type {{String}} and, for testing, a dummy & trivial {{case class}}.
> I create the necessary types such as:
> {code}
> implicit val typeInformationString = createTypeInformation[String]
> implicit val typeInformationDummy = createTypeInformation[Dummy]
> implicit val valueStateDescriptorDummy =
> new ValueStateDescriptor("state", typeInformationDummy)}}
> {code}
> And then query the like such as:
> {code}
> val javaCompletableFuture = client
> .getKvState(this.flinkJobIdObject, this.queryableState, key, 
> typeInformationString, valueStateDescriptorDummy)
> {code}
> Does anybody have any clue on this? Let me know what other information I 
> should provide.
> ---
> Full stack trace:
> {code}
> java.util.concurrent.CompletionException: 
> java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds 
> writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0)
>   at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>   at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>   at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
>   at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>   at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>   at 
> org.apache.flink.queryablestate.network.Client$PendingConnection.lambda$handInChannel$0(Client.java:289)
>   at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>   at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>   at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>   at 
> org.apache.flink.queryablestate.network.Client$EstablishedConnection.close(Client.java:432)
>   at 
> org.apache.flink.queryablestate.network.Client$EstablishedConnection.onFailure(Client.java:505)
>   at 
> org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:92)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>   at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>   at 
> 

[jira] [Reopened] (FLINK-8272) 1.4 & Scala: Exception when querying the state

2018-01-02 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reopened FLINK-8272:
-

Reopen to change resolution.

> 1.4 & Scala: Exception when querying the state
> --
>
> Key: FLINK-8272
> URL: https://issues.apache.org/jira/browse/FLINK-8272
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
> Environment: mac
>Reporter: Juan Miguel Cejuela
> Fix For: 1.4.0
>
>
> With Flink 1.3.2 and basically the same code except for the changes in the 
> API that happened in 1.4.0, I could query the state finely.
> With 1.4.0 (& scala), I get the exception written below. Most important line: 
> {{java.util.concurrent.CompletionException: 
> java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds 
> writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0)}}
> Somehow it seems that the connection to the underlying buffer fails. Perhaps 
> most informing is that, if I give a wrong {{JobId}} (i.e. one that is not 
> running), I still get the same error. So I guess no connection succeeds. 
> Also, to avoid possible serialization errors, I am just querying with a key 
> of type {{String}} and, for testing, a dummy & trivial {{case class}}.
> I create the necessary types such as:
> {code}
> implicit val typeInformationString = createTypeInformation[String]
> implicit val typeInformationDummy = createTypeInformation[Dummy]
> implicit val valueStateDescriptorDummy =
> new ValueStateDescriptor("state", typeInformationDummy)}}
> {code}
> And then query the like such as:
> {code}
> val javaCompletableFuture = client
> .getKvState(this.flinkJobIdObject, this.queryableState, key, 
> typeInformationString, valueStateDescriptorDummy)
> {code}
> Does anybody have any clue on this? Let me know what other information I 
> should provide.
> ---
> Full stack trace:
> {code}
> java.util.concurrent.CompletionException: 
> java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds 
> writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0)
>   at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>   at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>   at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
>   at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>   at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>   at 
> org.apache.flink.queryablestate.network.Client$PendingConnection.lambda$handInChannel$0(Client.java:289)
>   at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>   at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>   at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>   at 
> org.apache.flink.queryablestate.network.Client$EstablishedConnection.close(Client.java:432)
>   at 
> org.apache.flink.queryablestate.network.Client$EstablishedConnection.onFailure(Client.java:505)
>   at 
> org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:92)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>   at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>   at 
> 

[jira] [Commented] (FLINK-8282) Transformation with TwoInputStreamOperator fails

2018-01-02 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308293#comment-16308293
 ] 

Aljoscha Krettek commented on FLINK-8282:
-

In my opinion, there is no good separation of concerns between the 
{{StreamTask}} and the {{StreamOperator}}. {{AbstractStreamOperator}} does a 
lot of things that a stream operator shouldn't do and does things in a very 
specific way that {{StreamTask}} and other components expect to be done in this 
way and things go haywire if the operator doesn't behave that way.

Off the top of my head this includes everything that happens in {{setup()}}, 
i.e. metrics setup and configuration stuff, everything that happens in the 
various state initialisation methods and snapshot/restore methods, and the 
latency marker stuff.

There is actually this (somewhat old) issue: FLINK-4859.

> Transformation with TwoInputStreamOperator fails
> 
>
> Key: FLINK-8282
> URL: https://issues.apache.org/jira/browse/FLINK-8282
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>
> The following program fails because of multiple reasons (see exceptions 
> below). The transformation with a {{TwoInputStreamOperator}} does not extend 
> {{AbstractStreamOperator}}. I think this is the main cause why it fails. 
> Either we fix the exceptions or we check for {{AbstractStreamOperator}} first.
> {code}
>   final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>   DataStream ds1 = env.addSource(new 
> SourceFunction() {
>   @Override
>   public void run(SourceContext ctx) throws 
> Exception {
>   ctx.emitWatermark(new Watermark(100L));
>   ctx.collect(12);
>   while (true) Thread.yield();
>   }
>   @Override
>   public void cancel() {
>   }
>   });
>   DataStream ds2 = env.addSource(new 
> SourceFunction() {
>   @Override
>   public void run(SourceContext ctx) throws 
> Exception {
>   ctx.emitWatermark(new Watermark(200L));
>   ctx.collect(12);
>   while (true) Thread.yield();
>   }
>   @Override
>   public void cancel() {
>   }
>   });
>   ds1.connect(ds2.broadcast()).transform("test", Types.INT, new 
> TwoInputStreamOperator() {
>   @Override
>   public void processElement1(StreamRecord 
> element) throws Exception {
>   System.out.println();
>   }
>   @Override
>   public void processElement2(StreamRecord 
> element) throws Exception {
>   System.out.println();
>   }
>   @Override
>   public void processWatermark1(Watermark mark) throws 
> Exception {
>   System.out.println();
>   }
>   @Override
>   public void processWatermark2(Watermark mark) throws 
> Exception {
>   System.out.println();
>   }
>   @Override
>   public void processLatencyMarker1(LatencyMarker 
> latencyMarker) throws Exception {
>   }
>   @Override
>   public void processLatencyMarker2(LatencyMarker 
> latencyMarker) throws Exception {
>   }
>   @Override
>   public void setup(StreamTask containingTask, 
> StreamConfig config, Output output) {
>   }
>   @Override
>   public void open() throws Exception {
>   }
>   @Override
>   public void close() throws Exception {
>   }
>   @Override
>   public void dispose() throws Exception {
>   }
>   @Override
>   public OperatorSnapshotResult snapshotState(long 
> checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws 
> Exception {
>   return null;
>   }
>  

[jira] [Commented] (FLINK-8284) Custom metrics not being exposed for Prometheus

2018-01-02 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308280#comment-16308280
 ] 

Aljoscha Krettek commented on FLINK-8284:
-

[~Zentol] Could you please have a look at this?

> Custom metrics not being exposed for Prometheus
> ---
>
> Key: FLINK-8284
> URL: https://issues.apache.org/jira/browse/FLINK-8284
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Metrics
>Affects Versions: 1.4.0
> Environment: Linux/CentOS 7
>Reporter: Julio Biason
>
> Following the documentation, we changed our filter that removes events with 
> missing fields to a RichFilterFunction, so we can capture metrics about such 
> events:
> {code:scala}
> public class MissingClientFilter extends RichFilterFunction {
>   private transient Counter counter;
>   @Override
>   public void open(Configuration config) {
>   this.counter = getRuntimeContext()
>   .getMetricGroup()
>   .addGroup("events")
>   .counter("missingClient");
>   }
>   @Override
>   public boolean filter(LineData line) {
>   String client = line.get("client").toString();
>   boolean missing = client.trim().equals("");
>   if (!missing) {
>   this.count();
>   }
>   return !missing;
>   }
>   private void count() {
>   if (this.counter != null) {
>   this.counter.inc();
>   }
>   }
> }
> {code}
> We also added Prometheus as our reporter:
> {noformat}
> metrics.reporters: prom
> metrics.reporter.prom.port: 9105
> metrics.reporter.prom.class: 
> org.apache.flink.metrics.prometheus.PrometheusReporter
> {noformat}
> The problem is accessing port 9105 display all Flink metrics, but not ours.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8305) Docker port mapping doesn't work for JobManager RPC port on Official Flink image

2018-01-02 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308269#comment-16308269
 ] 

Aljoscha Krettek commented on FLINK-8305:
-

This is a known issue based on a "feature" of Akka: Akka connections only work 
if they client connects to the server with exactly the same address that the 
server "thinks" it has. I think you can work around this by adding {{EXPOSE 
6123}} to your Dockerfile and using exactly that port and from an environment 
that can use the hostname of the JobManager.

> Docker port mapping doesn't work for JobManager RPC port on Official Flink 
> image
> 
>
> Key: FLINK-8305
> URL: https://issues.apache.org/jira/browse/FLINK-8305
> Project: Flink
>  Issue Type: Bug
>  Components: Docker
>Affects Versions: 1.4.0, 1.3.2
> Environment: Mac OSX, Docker 17.09.1-ce-mac42 (21090)
>Reporter: Thomas Wozniakowski
>
> With the images at: https://hub.docker.com/r/_/flink/. The JobManager RPC 
> port does not appear to be properly exposed via a standard docker port 
> mapping.
> Starting a local cluster with:
> {{docker run  --name flink_local -p 32789:6123 -t -d flink local}}
> gives you: 
> ||CONTAINER ID||IMAGE||COMMAND||CREATED||STATUS||PORTS||NAMES||
> |e2dd5f668f4f|flink|"/docker-entrypoin..."|6 minutes ago|Up 6 
> minutes|8081/tcp, 0.0.0.0:32789->6123/tcp|flink_local|
> Should allow submission of a job via command line with:
> {{flink run -m 0.0.0.0:32789 
> /usr/local/Cellar/apache-flink/1.3.2/libexec/examples/streaming/WordCount.jar}}
> But this fails with "Couldn't retrieve the JobExecutionResult from the 
> JobManager".
> Logging directly into the container with:
> {{sudo docker exec -it  /bin/bash}}
> allows you to successfully start and complete job with:
> {{flink run -m localhost:6123 /opt/flink/examples/streaming/WordCount.jar}}
> What am I missing here? If the job can be successfully submitted to 6123 
> locally, and that port is mapped to an external port on 0.0.0.0, then I 
> should be able to submit jobs to that port? I can't find any way to make this 
> work though. I've tried all variants of host name (localhost, 127.0.0.1, 
> 0.0.0.0) and none work.
> I would hugely appreciate any help.
> Thanks guys.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8332) Move dispose savepoint into ClusterClient

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308243#comment-16308243
 ] 

ASF GitHub Bot commented on FLINK-8332:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5219#discussion_r159249258
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -659,128 +655,107 @@ protected int savepoint(String[] args) {
return 0;
}
 
-   if (options.isDispose()) {
-   // Discard
-   return disposeSavepoint(options);
-   } else {
-   // Trigger
-   String[] cleanedArgs = options.getArgs();
-   JobID jobId;
+   CustomCommandLine customCommandLine = 
getActiveCustomCommandLine(options.getCommandLine());
 
-   if (cleanedArgs.length >= 1) {
-   String jobIdString = cleanedArgs[0];
-   try {
-   jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
-   } catch (Exception e) {
-   return handleArgException(new 
IllegalArgumentException(
-   "Error: The value for 
the Job ID is not a valid ID."));
-   }
+   ClusterClient clusterClient = 
customCommandLine.retrieveCluster(options.getCommandLine(), config, 
configurationDirectory);
+
+   try {
+   if (options.isDispose()) {
+   // Discard
+   return disposeSavepoint(clusterClient, 
options.getSavepointPath());
} else {
-   return handleArgException(new 
IllegalArgumentException(
+   // Trigger
+   String[] cleanedArgs = options.getArgs();
+   JobID jobId;
+
+   if (cleanedArgs.length >= 1) {
+   String jobIdString = cleanedArgs[0];
+   try {
+   jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
--- End diff --

nit: `JobID.fromHexString(jobIdString)`


> Move dispose savepoint into ClusterClient
> -
>
> Key: FLINK-8332
> URL: https://issues.apache.org/jira/browse/FLINK-8332
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Currently, the {{CliFrontend}} sends the command for disposing a savepoint. 
> In order to better abstract this functionality we should move it to the 
> {{ClusterClient}}. That way we can have different implementations of the 
> {{ClusterClient}} (Flip-6 and old code) which are used by the same 
> {{CliFrontend}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...

2018-01-02 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5219#discussion_r159249258
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -659,128 +655,107 @@ protected int savepoint(String[] args) {
return 0;
}
 
-   if (options.isDispose()) {
-   // Discard
-   return disposeSavepoint(options);
-   } else {
-   // Trigger
-   String[] cleanedArgs = options.getArgs();
-   JobID jobId;
+   CustomCommandLine customCommandLine = 
getActiveCustomCommandLine(options.getCommandLine());
 
-   if (cleanedArgs.length >= 1) {
-   String jobIdString = cleanedArgs[0];
-   try {
-   jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
-   } catch (Exception e) {
-   return handleArgException(new 
IllegalArgumentException(
-   "Error: The value for 
the Job ID is not a valid ID."));
-   }
+   ClusterClient clusterClient = 
customCommandLine.retrieveCluster(options.getCommandLine(), config, 
configurationDirectory);
+
+   try {
+   if (options.isDispose()) {
+   // Discard
+   return disposeSavepoint(clusterClient, 
options.getSavepointPath());
} else {
-   return handleArgException(new 
IllegalArgumentException(
+   // Trigger
+   String[] cleanedArgs = options.getArgs();
+   JobID jobId;
+
+   if (cleanedArgs.length >= 1) {
+   String jobIdString = cleanedArgs[0];
+   try {
+   jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
--- End diff --

nit: `JobID.fromHexString(jobIdString)`


---


[jira] [Commented] (FLINK-8332) Move dispose savepoint into ClusterClient

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308233#comment-16308233
 ] 

ASF GitHub Bot commented on FLINK-8332:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5219#discussion_r159248959
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -659,128 +655,107 @@ protected int savepoint(String[] args) {
return 0;
}
 
-   if (options.isDispose()) {
-   // Discard
-   return disposeSavepoint(options);
-   } else {
-   // Trigger
-   String[] cleanedArgs = options.getArgs();
-   JobID jobId;
+   CustomCommandLine customCommandLine = 
getActiveCustomCommandLine(options.getCommandLine());
 
-   if (cleanedArgs.length >= 1) {
-   String jobIdString = cleanedArgs[0];
-   try {
-   jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
-   } catch (Exception e) {
-   return handleArgException(new 
IllegalArgumentException(
-   "Error: The value for 
the Job ID is not a valid ID."));
-   }
+   ClusterClient clusterClient = 
customCommandLine.retrieveCluster(options.getCommandLine(), config, 
configurationDirectory);
+
+   try {
+   if (options.isDispose()) {
+   // Discard
+   return disposeSavepoint(clusterClient, 
options.getSavepointPath());
} else {
-   return handleArgException(new 
IllegalArgumentException(
+   // Trigger
+   String[] cleanedArgs = options.getArgs();
+   JobID jobId;
+
+   if (cleanedArgs.length >= 1) {
+   String jobIdString = cleanedArgs[0];
+   try {
+   jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
--- End diff --

`JobID.fromHexString`


> Move dispose savepoint into ClusterClient
> -
>
> Key: FLINK-8332
> URL: https://issues.apache.org/jira/browse/FLINK-8332
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Currently, the {{CliFrontend}} sends the command for disposing a savepoint. 
> In order to better abstract this functionality we should move it to the 
> {{ClusterClient}}. That way we can have different implementations of the 
> {{ClusterClient}} (Flip-6 and old code) which are used by the same 
> {{CliFrontend}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...

2018-01-02 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5219#discussion_r159248959
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -659,128 +655,107 @@ protected int savepoint(String[] args) {
return 0;
}
 
-   if (options.isDispose()) {
-   // Discard
-   return disposeSavepoint(options);
-   } else {
-   // Trigger
-   String[] cleanedArgs = options.getArgs();
-   JobID jobId;
+   CustomCommandLine customCommandLine = 
getActiveCustomCommandLine(options.getCommandLine());
 
-   if (cleanedArgs.length >= 1) {
-   String jobIdString = cleanedArgs[0];
-   try {
-   jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
-   } catch (Exception e) {
-   return handleArgException(new 
IllegalArgumentException(
-   "Error: The value for 
the Job ID is not a valid ID."));
-   }
+   ClusterClient clusterClient = 
customCommandLine.retrieveCluster(options.getCommandLine(), config, 
configurationDirectory);
+
+   try {
+   if (options.isDispose()) {
+   // Discard
+   return disposeSavepoint(clusterClient, 
options.getSavepointPath());
} else {
-   return handleArgException(new 
IllegalArgumentException(
+   // Trigger
+   String[] cleanedArgs = options.getArgs();
+   JobID jobId;
+
+   if (cleanedArgs.length >= 1) {
+   String jobIdString = cleanedArgs[0];
+   try {
+   jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
--- End diff --

`JobID.fromHexString`


---


[jira] [Commented] (FLINK-8314) Add support for Kinesis Firehose

2018-01-02 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308230#comment-16308230
 ] 

Aljoscha Krettek commented on FLINK-8314:
-

So this would essentially be another connector that is not exactly like Kinesis 
but in the same ecosystem? [~tzulitai] and [~phoenixjiangnan], what do you 
think?

> Add support for Kinesis Firehose
> 
>
> Key: FLINK-8314
> URL: https://issues.apache.org/jira/browse/FLINK-8314
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Ivan Mushketyk
>Priority: Minor
>
> [Kinesis 
> Firehose|https://docs.aws.amazon.com/firehose/latest/dev/what-is-this-service.html]
>  is another product in Kinesis Family that allows zero-ops scalable stream 
> for delivering streaming data to S3, RedShift, Elasticsearch, etc. It has a 
> different 
> [API|http://docs.aws.amazon.com/firehose/latest/APIReference/Welcome.html].
> I can contribute this myself, but I wonder if this feature makes sense to you.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8315) Use AWSCredentialsProvider to get AWS credentials

2018-01-02 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308225#comment-16308225
 ] 

Aljoscha Krettek commented on FLINK-8315:
-

I think this makes sense, but I'm not too familiar with the Kinesis connector. 
[~tzulitai] and [~phoenixjiangnan], what do you think about this?

> Use AWSCredentialsProvider to get AWS credentials
> -
>
> Key: FLINK-8315
> URL: https://issues.apache.org/jira/browse/FLINK-8315
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Ivan Mushketyk
>Priority: Minor
>
> Instead of providing credentials like:
> {code:java}
> Properties consumerConfig = new Properties();
> consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1");
> consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "...");
> consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
> "...");
> consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
> "TRIM_HORIZON");
> {code}
> Kinesis connector could use the AWSCredentialsProvider interface.
> I can contribute it myself, but I wonder if this feature makes sense to you.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5192: [FLINK-8257] [conf] Unify the value checks for set...

2018-01-02 Thread xccui
Github user xccui closed the pull request at:

https://github.com/apache/flink/pull/5192


---


[jira] [Commented] (FLINK-8257) Unify the value checks for setParallelism()

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308219#comment-16308219
 ] 

ASF GitHub Bot commented on FLINK-8257:
---

Github user xccui closed the pull request at:

https://github.com/apache/flink/pull/5192


> Unify the value checks for setParallelism()
> ---
>
> Key: FLINK-8257
> URL: https://issues.apache.org/jira/browse/FLINK-8257
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
> Fix For: 1.5.0
>
>
> The {{setParallelism()}} method exist in many components from different 
> levels. Some of the methods require the input value to be greater than {{1}} 
> (e.g., {{StreamTransformation.setParallelism()}}), while some of them also 
> allow the value to be {{ExecutionConfig.PARALLELISM_DEFAULT}}, which is 
> {{-1}} by default (e.g., {{DataSink.setParallelism()}}). We need to unify the 
> value checks for these methods.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-8318) Conflict jackson library with ElasticSearch connector

2018-01-02 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308217#comment-16308217
 ] 

Aljoscha Krettek edited comment on FLINK-8318 at 1/2/18 3:28 PM:
-

Do you have a specific reason for using {{parent-first}}? I think you case 
should work if you include ES and Jackson in your user jar and use 
{{child-first}}, which was introduced for exactly such cases of dependency 
clashes.


was (Author: aljoscha):
Do you have a specific reason for using `parent-first`. I think you case should 
work if you include ES and Jackson in your user jar and use `child-first`, 
which was introduced for exactly such cases of dependency clashes.

> Conflict jackson library with ElasticSearch connector
> -
>
> Key: FLINK-8318
> URL: https://issues.apache.org/jira/browse/FLINK-8318
> Project: Flink
>  Issue Type: Bug
>  Components: ElasticSearch Connector, Startup Shell Scripts
>Affects Versions: 1.4.0
>Reporter: Jihyun Cho
>
> My flink job is failed after update flink version to 1.4.0. It uses 
> ElasticSearch connector.
> I'm using CDH Hadoop with Flink option "classloader.resolve-order: 
> parent-first" 
> The failure log is below.
> {noformat}
> Using the result of 'hadoop classpath' to augment the Hadoop classpath: 
> /etc/hadoop/conf:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/./:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/.//*
> 2017-12-26 14:13:21,160 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> 
> 2017-12-26 14:13:21,161 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Starting 
> TaskManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC)
> 2017-12-26 14:13:21,161 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  OS current 
> user: www
> 2017-12-26 14:13:21,446 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Current 
> Hadoop/Kerberos user: www
> 2017-12-26 14:13:21,446 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  JVM: Java 
> HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.131-b11
> 2017-12-26 14:13:21,447 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Maximum heap 
> size: 31403 MiBytes
> 2017-12-26 14:13:21,447 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  JAVA_HOME: 
> (not set)
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Hadoop 
> version: 2.6.5
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  JVM Options:
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - -Xms32768M
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - -Xmx32768M
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -XX:MaxDirectMemorySize=8388607T
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -Djava.library.path=/home/cloudera/parcels/CDH/lib/hadoop/lib/native/
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -Dlog4j.configuration=file:/home/www/service/flink-1.4.0/conf/log4j-console.properties
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -Dlogback.configurationFile=file:/home/www/service/flink-1.4.0/conf/logback-console.xml
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Program 
> Arguments:
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> --configDir
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> /home/www/service/flink-1.4.0/conf
> 2017-12-26 14:13:21,449 INFO  
> 

[jira] [Commented] (FLINK-8257) Unify the value checks for setParallelism()

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308218#comment-16308218
 ] 

ASF GitHub Bot commented on FLINK-8257:
---

Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5192
  
Thanks for the review, @aljoscha  . I'll close this PR.


> Unify the value checks for setParallelism()
> ---
>
> Key: FLINK-8257
> URL: https://issues.apache.org/jira/browse/FLINK-8257
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
> Fix For: 1.5.0
>
>
> The {{setParallelism()}} method exist in many components from different 
> levels. Some of the methods require the input value to be greater than {{1}} 
> (e.g., {{StreamTransformation.setParallelism()}}), while some of them also 
> allow the value to be {{ExecutionConfig.PARALLELISM_DEFAULT}}, which is 
> {{-1}} by default (e.g., {{DataSink.setParallelism()}}). We need to unify the 
> value checks for these methods.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8318) Conflict jackson library with ElasticSearch connector

2018-01-02 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308217#comment-16308217
 ] 

Aljoscha Krettek commented on FLINK-8318:
-

Do you have a specific reason for using `parent-first`. I think you case should 
work if you include ES and Jackson in your user jar and use `child-first`, 
which was introduced for exactly such cases of dependency clashes.

> Conflict jackson library with ElasticSearch connector
> -
>
> Key: FLINK-8318
> URL: https://issues.apache.org/jira/browse/FLINK-8318
> Project: Flink
>  Issue Type: Bug
>  Components: ElasticSearch Connector, Startup Shell Scripts
>Affects Versions: 1.4.0
>Reporter: Jihyun Cho
>
> My flink job is failed after update flink version to 1.4.0. It uses 
> ElasticSearch connector.
> I'm using CDH Hadoop with Flink option "classloader.resolve-order: 
> parent-first" 
> The failure log is below.
> {noformat}
> Using the result of 'hadoop classpath' to augment the Hadoop classpath: 
> /etc/hadoop/conf:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/./:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/.//*
> 2017-12-26 14:13:21,160 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> 
> 2017-12-26 14:13:21,161 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Starting 
> TaskManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC)
> 2017-12-26 14:13:21,161 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  OS current 
> user: www
> 2017-12-26 14:13:21,446 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Current 
> Hadoop/Kerberos user: www
> 2017-12-26 14:13:21,446 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  JVM: Java 
> HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.131-b11
> 2017-12-26 14:13:21,447 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Maximum heap 
> size: 31403 MiBytes
> 2017-12-26 14:13:21,447 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  JAVA_HOME: 
> (not set)
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Hadoop 
> version: 2.6.5
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  JVM Options:
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - -Xms32768M
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - -Xmx32768M
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -XX:MaxDirectMemorySize=8388607T
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -Djava.library.path=/home/cloudera/parcels/CDH/lib/hadoop/lib/native/
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -Dlog4j.configuration=file:/home/www/service/flink-1.4.0/conf/log4j-console.properties
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -Dlogback.configurationFile=file:/home/www/service/flink-1.4.0/conf/logback-console.xml
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Program 
> Arguments:
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> --configDir
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> /home/www/service/flink-1.4.0/conf
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Classpath:
> ...:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/.//jackson-core-2.2.3.jar:...
> 
> 2017-12-26 14:14:01,393 INFO  org.apache.flink.runtime.taskmanager.Task   
> 

[GitHub] flink issue #5192: [FLINK-8257] [conf] Unify the value checks for setParalle...

2018-01-02 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5192
  
Thanks for the review, @aljoscha 😄 . I'll close this PR.


---


[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308212#comment-16308212
 ] 

ASF GitHub Bot commented on FLINK-8317:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5223#discussion_r159247245
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java ---
@@ -27,11 +27,16 @@
 import java.util.concurrent.TimeoutException;
 
 /**
- * Utility functions for Flink's RPC implementation
+ * Utility functions for Flink's RPC implementation.
  */
 public class RpcUtils {
 
-   public static final Time INF_TIMEOUT = 
Time.milliseconds(Long.MAX_VALUE);
+   /**
+* HACK: Set to 21474835 seconds, Akka's maximum delay (Akka 
2.4.20). The value cannot be
+* higher or an {@link IllegalArgumentException} will be thrown during 
an RPC. Check the private
+* method {@code checkMaxDelay()} in {@link 
akka.actor.LightArrayRevolverScheduler}.
+*/
+   public static final Time INF_TIMEOUT = 
Time.milliseconds(TimeUnit.SECONDS.toMillis(21474835));
--- End diff --

`Time.seconds(21474835)`


> Enable Triggering of Savepoints via RestfulGateway
> --
>
> Key: FLINK-8317
> URL: https://issues.apache.org/jira/browse/FLINK-8317
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Enable triggering of savepoints in FLIP-6 mode via RestfulGateway:
> * Add method to {{CompletableFuture 
> triggerSavepoint(long timestamp, String targetDirectory)}} to interface
> * Implement method in {{Dispatcher}} and {{JobMaster}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5223: [FLINK-8317][flip6] Implement Triggering of Savepo...

2018-01-02 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5223#discussion_r159247245
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java ---
@@ -27,11 +27,16 @@
 import java.util.concurrent.TimeoutException;
 
 /**
- * Utility functions for Flink's RPC implementation
+ * Utility functions for Flink's RPC implementation.
  */
 public class RpcUtils {
 
-   public static final Time INF_TIMEOUT = 
Time.milliseconds(Long.MAX_VALUE);
+   /**
+* HACK: Set to 21474835 seconds, Akka's maximum delay (Akka 
2.4.20). The value cannot be
+* higher or an {@link IllegalArgumentException} will be thrown during 
an RPC. Check the private
+* method {@code checkMaxDelay()} in {@link 
akka.actor.LightArrayRevolverScheduler}.
+*/
+   public static final Time INF_TIMEOUT = 
Time.milliseconds(TimeUnit.SECONDS.toMillis(21474835));
--- End diff --

`Time.seconds(21474835)`


---


[jira] [Commented] (FLINK-8322) support getting number of existing timers in TimerService

2018-01-02 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308201#comment-16308201
 ] 

Aljoscha Krettek commented on FLINK-8322:
-

Yes, I think one (maybe the best) solution is to keep an extra bit of state for 
keeping track of the number of timers. {{num*Timers()}} will probably get more 
expensive once we store timers not only on the heap but for example in RocksDB. 
(See FLINK-5544)

> support getting number of existing timers in TimerService
> -
>
> Key: FLINK-8322
> URL: https://issues.apache.org/jira/browse/FLINK-8322
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>
> We have the use cases where we want to use timers as scheduled threads - e.g. 
> add a timer to wake up x hours later and do something (reap old data usualy) 
> only if there's no existing timers, basically we only want at most 1 timer 
> exists for the key all the time



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8326) CheckpointCoordinatorTest#testRestoreLatestCheckpointedStateScaleOut() didn't use the correct parameter to trigger test function

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308198#comment-16308198
 ] 

ASF GitHub Bot commented on FLINK-8326:
---

Github user tony810430 closed the pull request at:

https://github.com/apache/flink/pull/5211


> CheckpointCoordinatorTest#testRestoreLatestCheckpointedStateScaleOut() didn't 
> use the correct parameter to trigger test function
> 
>
> Key: FLINK-8326
> URL: https://issues.apache.org/jira/browse/FLINK-8326
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>Priority: Minor
> Fix For: 1.5.0
>
>
> {{CheckpointCoordinatorTest#testRestoreLatestCheckpointedStateScaleOut()}} 
> didn't use the correct parameter, so that it only test scale in.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8332) Move dispose savepoint into ClusterClient

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308196#comment-16308196
 ] 

ASF GitHub Bot commented on FLINK-8332:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5219
  
This PR is based on #5216 


> Move dispose savepoint into ClusterClient
> -
>
> Key: FLINK-8332
> URL: https://issues.apache.org/jira/browse/FLINK-8332
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Currently, the {{CliFrontend}} sends the command for disposing a savepoint. 
> In order to better abstract this functionality we should move it to the 
> {{ClusterClient}}. That way we can have different implementations of the 
> {{ClusterClient}} (Flip-6 and old code) which are used by the same 
> {{CliFrontend}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5211: [FLINK-8326] [checkpoints] CheckpointCoordinatorTe...

2018-01-02 Thread tony810430
Github user tony810430 closed the pull request at:

https://github.com/apache/flink/pull/5211


---


[GitHub] flink issue #5219: [FLINK-8332] [flip6] Move savepoint dispose into ClusterC...

2018-01-02 Thread GJL
Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5219
  
This PR is based on #5216 


---


[jira] [Commented] (FLINK-8326) CheckpointCoordinatorTest#testRestoreLatestCheckpointedStateScaleOut() didn't use the correct parameter to trigger test function

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308191#comment-16308191
 ] 

ASF GitHub Bot commented on FLINK-8326:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5211
  
Thanks a lot for catching this!  

I merged, could you please close this PR?


> CheckpointCoordinatorTest#testRestoreLatestCheckpointedStateScaleOut() didn't 
> use the correct parameter to trigger test function
> 
>
> Key: FLINK-8326
> URL: https://issues.apache.org/jira/browse/FLINK-8326
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>Priority: Minor
> Fix For: 1.5.0
>
>
> {{CheckpointCoordinatorTest#testRestoreLatestCheckpointedStateScaleOut()}} 
> didn't use the correct parameter, so that it only test scale in.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-8326) CheckpointCoordinatorTest#testRestoreLatestCheckpointedStateScaleOut() didn't use the correct parameter to trigger test function

2018-01-02 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-8326.
---
   Resolution: Fixed
Fix Version/s: 1.5.0

Fixed on master in
e20536586aa21ad7cfdb805b259827b2a15e3c92

> CheckpointCoordinatorTest#testRestoreLatestCheckpointedStateScaleOut() didn't 
> use the correct parameter to trigger test function
> 
>
> Key: FLINK-8326
> URL: https://issues.apache.org/jira/browse/FLINK-8326
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>Priority: Minor
> Fix For: 1.5.0
>
>
> {{CheckpointCoordinatorTest#testRestoreLatestCheckpointedStateScaleOut()}} 
> didn't use the correct parameter, so that it only test scale in.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5211: [FLINK-8326] [checkpoints] CheckpointCoordinatorTest#test...

2018-01-02 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5211
  
Thanks a lot for catching this! 👍 

I merged, could you please close this PR?


---


[jira] [Commented] (FLINK-8329) Move YarnClient out of YarnClusterClient

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308185#comment-16308185
 ] 

ASF GitHub Bot commented on FLINK-8329:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5216#discussion_r159243765
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ---
@@ -143,31 +138,48 @@
private YarnConfigOptions.UserJarInclusion userJarInclusion;
 
public AbstractYarnClusterDescriptor(
-   org.apache.flink.configuration.Configuration flinkConfiguration,
-   String configurationDirectory) {
+   Configuration flinkConfiguration,
+   String configurationDirectory,
+   YarnClient yarnClient) {
+
+   yarnConfiguration = new YarnConfiguration();
+
// for unit tests only
if (System.getenv("IN_TESTS") != null) {
try {
-   conf.addResource(new 
File(System.getenv("YARN_CONF_DIR") + "/yarn-site.xml").toURI().toURL());
+   yarnConfiguration.addResource(new 
File(System.getenv("YARN_CONF_DIR") + "/yarn-site.xml").toURI().toURL());
--- End diff --

I think it's safer to use `new File(System.getenv("YARN_CONF_DIR"),  
"yarn-site.xml")`


> Move YarnClient out of YarnClusterClient
> 
>
> Key: FLINK-8329
> URL: https://issues.apache.org/jira/browse/FLINK-8329
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Move the {{YarnClient}} from the {{YarnClusterClient}} to the 
> {{AbstractYarnClusterDescriptor}} which will be responsible for the lifecycle 
> management of the {{YarnClient}}. This change is a clean up task which will 
> better structure the client code.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8329) Move YarnClient out of YarnClusterClient

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308184#comment-16308184
 ] 

ASF GitHub Bot commented on FLINK-8329:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5216#discussion_r159242946
  
--- Diff: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java ---
@@ -78,6 +83,9 @@ public void testPerJobMode() {
jobGraph.addJar(new 
org.apache.flink.core.fs.Path(testingJar.toURI()));
 
YarnClusterClient clusterClient = 
yarnClusterDescriptorV2.deployJobCluster(clusterSpecification, jobGraph);
+
+   clusterClient.shutdown();
+   yarnClusterDescriptorV2.close();
--- End diff --

Closing in finally or try-with-resource not needed?


> Move YarnClient out of YarnClusterClient
> 
>
> Key: FLINK-8329
> URL: https://issues.apache.org/jira/browse/FLINK-8329
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Move the {{YarnClient}} from the {{YarnClusterClient}} to the 
> {{AbstractYarnClusterDescriptor}} which will be responsible for the lifecycle 
> management of the {{YarnClient}}. This change is a clean up task which will 
> better structure the client code.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8329) Move YarnClient out of YarnClusterClient

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308186#comment-16308186
 ] 

ASF GitHub Bot commented on FLINK-8329:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5216#discussion_r159243077
  
--- Diff: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java 
---
@@ -272,6 +275,7 @@ public void testJavaAPI() throws Exception {
LOG.info("Shutting down cluster. All tests passed");
// shutdown cluster
yarnCluster.shutdown();
+   clusterDescriptor.close();
--- End diff --

Closing in finally or try-with-resource not needed?


> Move YarnClient out of YarnClusterClient
> 
>
> Key: FLINK-8329
> URL: https://issues.apache.org/jira/browse/FLINK-8329
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Move the {{YarnClient}} from the {{YarnClusterClient}} to the 
> {{AbstractYarnClusterDescriptor}} which will be responsible for the lifecycle 
> management of the {{YarnClient}}. This change is a clean up task which will 
> better structure the client code.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5216: [FLINK-8329] [flip6] Move YarnClient to AbstractYa...

2018-01-02 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5216#discussion_r159243077
  
--- Diff: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java 
---
@@ -272,6 +275,7 @@ public void testJavaAPI() throws Exception {
LOG.info("Shutting down cluster. All tests passed");
// shutdown cluster
yarnCluster.shutdown();
+   clusterDescriptor.close();
--- End diff --

Closing in finally or try-with-resource not needed?


---


[GitHub] flink pull request #5216: [FLINK-8329] [flip6] Move YarnClient to AbstractYa...

2018-01-02 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5216#discussion_r159243765
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ---
@@ -143,31 +138,48 @@
private YarnConfigOptions.UserJarInclusion userJarInclusion;
 
public AbstractYarnClusterDescriptor(
-   org.apache.flink.configuration.Configuration flinkConfiguration,
-   String configurationDirectory) {
+   Configuration flinkConfiguration,
+   String configurationDirectory,
+   YarnClient yarnClient) {
+
+   yarnConfiguration = new YarnConfiguration();
+
// for unit tests only
if (System.getenv("IN_TESTS") != null) {
try {
-   conf.addResource(new 
File(System.getenv("YARN_CONF_DIR") + "/yarn-site.xml").toURI().toURL());
+   yarnConfiguration.addResource(new 
File(System.getenv("YARN_CONF_DIR") + "/yarn-site.xml").toURI().toURL());
--- End diff --

I think it's safer to use `new File(System.getenv("YARN_CONF_DIR"),  
"yarn-site.xml")`


---


[GitHub] flink pull request #5216: [FLINK-8329] [flip6] Move YarnClient to AbstractYa...

2018-01-02 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5216#discussion_r159242946
  
--- Diff: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java ---
@@ -78,6 +83,9 @@ public void testPerJobMode() {
jobGraph.addJar(new 
org.apache.flink.core.fs.Path(testingJar.toURI()));
 
YarnClusterClient clusterClient = 
yarnClusterDescriptorV2.deployJobCluster(clusterSpecification, jobGraph);
+
+   clusterClient.shutdown();
+   yarnClusterDescriptorV2.close();
--- End diff --

Closing in finally or try-with-resource not needed?


---


[jira] [Commented] (FLINK-8327) ClassLoader resolution of child-first does not appear to work

2018-01-02 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308180#comment-16308180
 ] 

Aljoscha Krettek commented on FLINK-8327:
-

So the fat-jar contains the Akka classes?

I think the problem is that the configuration file is not included in the 
fat-jar. For Akka these files should be called {{reference.conf}} and I think 
you can include them in your fat-jar via this (Maven) configuration:
{code}


reference.conf


{code}

Unfortunately, I don't know how to do that using sbt but you can try manually 
adding the {{reference.conf}} file from the {{akka-http}} jar to your jar by 
unzipping and re-zipping it with that file.

> ClassLoader resolution of child-first does not appear to work
> -
>
> Key: FLINK-8327
> URL: https://issues.apache.org/jira/browse/FLINK-8327
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
> Environment: h3. Environment
> * Local flink cluster version 1.4.0
> * {{classloader.resolve-order: child-first}} in {{conf/flink-conf.yaml}}.
> * scala 2.11.11
> * oracle jdk 1.8.0
> h3. Library version
> * akka actors 2.4.20
> * akka http 10.0.10
>Reporter: Raymond Tay
>
> h2. Description
> Was trying out the {{Queryable State}} and ran into a problem where the 
> submitted job starts regular Akka actors and making external HTTP calls via 
> {{akka-http}} libraries and the flink runtime was complaining that it was not 
> able to read the key {{akka.http}} (this key is held in the configuration 
> file for {{akka-http}}).
> When i ran our app on the {{sbt}} shell locally, it was able to see the key 
> {{akka.http}} but when we submitted the fatjar (via {{sbt-assembly}}) to 
> flink, it was throwing the error message (see below). Looks like a class 
> loader issue but i'm not sure.
> Any help is much appreciated !
> h2. Error message
> {noformat}
> Caused by: com.typesafe.config.ConfigException$Missing: No configuration 
> setting found for key 'akka.http'
>   at 
> com.typesafe.config.impl.SimpleConfig.findKeyOrNull(SimpleConfig.java:152)
>   at 
> com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:170)
>   at 
> com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:176)
>   at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:184)
>   at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:189)
>   at 
> com.typesafe.config.impl.SimpleConfig.getObject(SimpleConfig.java:258)
>   at 
> com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:264)
>   at com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:37)
>   at akka.http.scaladsl.Http$.createExtension(Http.scala:843)
>   at akka.http.scaladsl.Http$.createExtension(Http.scala:719)
>   at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:917)
>   at akka.actor.ExtensionId$class.apply(Extension.scala:79)
>   at akka.http.scaladsl.Http$.apply(Http.scala:838)
>   at akka.http.scaladsl.Http$.apply(Http.scala:719)
>   at org.example.state.A.(Simple.scala:158)
>   ... 18 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8334) Elasticsearch Connector throwing java.lang.ClassNotFoundException: org.jboss.netty.channel.socket.nio.SocketSendBufferPool$GatheringSendBuffer

2018-01-02 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308168#comment-16308168
 ] 

Aljoscha Krettek commented on FLINK-8334:
-

Could you check if your jar files contain any netty classes? This could be a 
clash in dependencies or Maven might be excluding stuff since it "thinks" it 
comes provided with Flink.

> Elasticsearch Connector throwing java.lang.ClassNotFoundException: 
> org.jboss.netty.channel.socket.nio.SocketSendBufferPool$GatheringSendBuffer
> --
>
> Key: FLINK-8334
> URL: https://issues.apache.org/jira/browse/FLINK-8334
> Project: Flink
>  Issue Type: Bug
>  Components: ElasticSearch Connector
>Affects Versions: 1.4.0
> Environment: Using Elasticsearch 5.1.2 in a docker environment
> Flink is deployed on a different docker
>Reporter: Bhaskar Divya
>  Labels: elasticsearch, netty
>
> I have a Elasticsearch sink configured. When a job is submitted, It goes into 
> fail status in a few seconds. 
> Following is the Exception from the Job screen:
> {code:java}
> java.lang.RuntimeException: Elasticsearch client is not connected to any 
> Elasticsearch nodes!
>   at 
> org.apache.flink.streaming.connectors.elasticsearch5.Elasticsearch5ApiCallBridge.createClient(Elasticsearch5ApiCallBridge.java:80)
>   at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:281)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>   at 
> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:748)
> {code}
> In the logs, Following stack trace is shown.
> {code}
> 2018-01-01 12:15:14,432 INFO  
> org.elasticsearch.client.transport.TransportClientNodesService  - failed to 
> get node info for 
> {#transport#-1}{8IZTMPcSRCyKRynhfyN2fA}{192.168.99.100}{192.168.99.100:9300}, 
> disconnecting...
> NodeDisconnectedException[[][192.168.99.100:9300][cluster:monitor/nodes/liveness]
>  disconnected]
> 2018-01-01 12:15:19,433 ERROR org.elasticsearch.transport.netty3.Netty3Utils  
>   - fatal error on the network layer
>   at 
> org.elasticsearch.transport.netty3.Netty3Utils.maybeDie(Netty3Utils.java:195)
>   at 
> org.elasticsearch.transport.netty3.Netty3MessageChannelHandler.exceptionCaught(Netty3MessageChannelHandler.java:82)
>   at 
> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:112)
>   at 
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
>   at 
> org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
>   at 
> org.jboss.netty.handler.codec.frame.FrameDecoder.exceptionCaught(FrameDecoder.java:377)
>   at 
> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:112)
>   at 
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
>   at 
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
>   at 
> org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:525)
>   at 
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:291)
>   at 
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromTaskLoop(AbstractNioWorker.java:151)
>   at 
> org.jboss.netty.channel.socket.nio.AbstractNioChannel$WriteTask.run(AbstractNioChannel.java:292)
>   at 
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:391)
>   at 
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:315)
>   at 
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
>   at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
>   at 
> org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
>   at 
> org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   

[jira] [Created] (FLINK-8349) Remove Yarn specific commands from YarnClusterClient

2018-01-02 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8349:


 Summary: Remove Yarn specific commands from YarnClusterClient
 Key: FLINK-8349
 URL: https://issues.apache.org/jira/browse/FLINK-8349
 Project: Flink
  Issue Type: Sub-task
  Components: Client
Affects Versions: 1.5.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.5.0


The {{YarnClusterClient}} should no longer use Yarn specific commands. This is 
necessary to make the {{FlinkYarnSessionCli}} work with other {{ClusterClient}} 
implementations than the {{YarnClusterClient}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5193: [FLINK-8268][tests] Improve tests stability

2018-01-02 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5193#discussion_r159238089
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 ---
@@ -98,7 +99,13 @@
// late arriving event OutputTag
private static final OutputTag> lateOutputTag = 
new OutputTag>("late-output") {};
 
-   private void 
testSlidingEventTimeWindows(OneInputStreamOperatorTestHarness, Tuple2> testHarness) throws Exception {
+   private void 
testSlidingEventTimeWindows(OneInputStreamOperator, 
Tuple2> operator) throws Exception {
--- End diff --

Yes. Previously restoring tests were working on `closed` harness, which was 
working only because test harness it self wasn't properly closing itself 🙄 


---


[jira] [Commented] (FLINK-8268) Test instability for 'TwoPhaseCommitSinkFunctionTest'

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308151#comment-16308151
 ] 

ASF GitHub Bot commented on FLINK-8268:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5193#discussion_r159238089
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 ---
@@ -98,7 +99,13 @@
// late arriving event OutputTag
private static final OutputTag> lateOutputTag = 
new OutputTag>("late-output") {};
 
-   private void 
testSlidingEventTimeWindows(OneInputStreamOperatorTestHarness, Tuple2> testHarness) throws Exception {
+   private void 
testSlidingEventTimeWindows(OneInputStreamOperator, 
Tuple2> operator) throws Exception {
--- End diff --

Yes. Previously restoring tests were working on `closed` harness, which was 
working only because test harness it self wasn't properly closing itself  


> Test instability for 'TwoPhaseCommitSinkFunctionTest'
> -
>
> Key: FLINK-8268
> URL: https://issues.apache.org/jira/browse/FLINK-8268
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Stephan Ewen
>Assignee: Piotr Nowojski
>Priority: Critical
>  Labels: test-stability
>
> The following exception / failure message occurs.
> {code}
> Tests run: 5, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 1.824 sec <<< 
> FAILURE! - in 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest
> testIgnoreCommitExceptionDuringRecovery(org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest)
>   Time elapsed: 0.068 sec  <<< ERROR!
> java.lang.Exception: Could not complete snapshot 0 for operator MockTask 
> (1/1).
>   at java.io.FileOutputStream.writeBytes(Native Method)
>   at java.io.FileOutputStream.write(FileOutputStream.java:326)
>   at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
>   at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:291)
>   at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:295)
>   at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:141)
>   at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:229)
>   at java.io.BufferedWriter.flush(BufferedWriter.java:254)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:313)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:288)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:290)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
>   at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest.testIgnoreCommitExceptionDuringRecovery(TwoPhaseCommitSinkFunctionTest.java:208)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8119) Cannot submit jobs to YARN Session in FLIP-6 mode

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308146#comment-16308146
 ] 

ASF GitHub Bot commented on FLINK-8119:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5234

[FLINK-8119] [flip6] Wire correct Flip6 components in 
Flip6YarnClusterDescriptor

## What is the purpose of the change

Let the Flip6YarnClusterDescriptor create a RestClusterClient as 
ClusterClient.
Moreover, this commit makes the YarnResourceManager register under the REST 
port
at Yarn.

This PR is based on #5233.

## Brief change log

- Implement unsupported methods in `RestClusterClient`
- Register `YarnResourceManager` under REST port at Yarn
- Let `Flip6YarnClusterDescriptor` return a `RestClusterClient`

## Verifying this change

- Tested manually

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
implementRestClusterClientMethods

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5234.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5234


commit c48324702b0ebff3fd7a16e480ead90fc4c9a30b
Author: Till Rohrmann 
Date:   2017-12-07T12:57:24Z

[FLINK-8328] [flip6] Move Yarn ApplicationStatus polling out of 
YarnClusterClient

Introduce YarnApplicationStatusMonitor which does the Yarn 
ApplicationStatus polling in
the FlinkYarnSessionCli. This decouples the YarnClusterClient from the 
actual communication
with Yarn and, thus, gives a better separation of concerns.

commit 783bd1daf36b34a74cbe718a52a1043ba38d5a44
Author: Till Rohrmann 
Date:   2017-12-20T15:43:21Z

[FLINK-8329] [flip6] Move YarnClient to AbstractYarnClusterDescriptor

Moves the YarnClient from the YarnClusterClient to the 
AbstractYarnClusterDescriptor.
This makes the latter responsible for the lifecycle management of the 
client and gives
a better separation of concerns.

commit 192adb786a48d19b71e797355500652d51de6296
Author: Till Rohrmann 
Date:   2017-12-18T17:59:30Z

[FLINK-8332] [flip6] Move savepoint dispose into ClusterClient

Move the savepoint disposal logic from the CliFrontend into the 
ClusterClient. This gives
a better separation of concerns and allows the CliFrontend to be used with 
different
ClusterClient implementations.

commit a73b6e2850d4b2445a835914ba570d1057e59dfb
Author: Till Rohrmann 
Date:   2018-01-02T06:42:18Z

[FLINK-8333] [flip6] Separate deployment options from command options

This commit separates the parsing of command options and deployment options 
into two
steps. This makes it easier to make the CustomCommandLines non-static.

Moreover, this commit moves the CliFrontend into the cli sub package.

commit 57df8f9e1c5f4015cd6543ded43849a71003ef36
Author: Till Rohrmann 
Date:   2018-01-02T06:59:34Z

[FLINK-8338] [flip6] Make CustomCommandLines non static in CliFrontend

This commit changes how CustomCommandLines are registered at the 
CliFrontend.
Henceforth, the CliFrontend is initialized with the set of 
CustomCommandLines
instead of registering them statically. This improves maintainability and
testability.

commit 2ee8059b949b4a9c2e165f3e78716ae531e5c0e7
Author: Till Rohrmann 
Date:   2018-01-02T08:22:12Z

[FLINK-8339] [flip6] Let CustomCommandLine return ClusterDescriptor

Instead of directly retrieving or deploying a Flink cluster, the
CustomCommandLine now only returns a ClusterDescriptor which can be used
for these operations. This disentangles the ClusterDescriptor and the
CustomCommandLine a bit better supporting a proper lifecycle management
of the former.

commit 9ff62c7cfadc977d02f414ee26fae6b76a568eda
Author: Till Rohrmann 
Date:   2018-01-02T08:55:13Z

[FLINK-8340] [flip6] Remove passing of Configuration to CustomCommandLine

Since the 

[GitHub] flink pull request #5234: [FLINK-8119] [flip6] Wire correct Flip6 components...

2018-01-02 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5234

[FLINK-8119] [flip6] Wire correct Flip6 components in 
Flip6YarnClusterDescriptor

## What is the purpose of the change

Let the Flip6YarnClusterDescriptor create a RestClusterClient as 
ClusterClient.
Moreover, this commit makes the YarnResourceManager register under the REST 
port
at Yarn.

This PR is based on #5233.

## Brief change log

- Implement unsupported methods in `RestClusterClient`
- Register `YarnResourceManager` under REST port at Yarn
- Let `Flip6YarnClusterDescriptor` return a `RestClusterClient`

## Verifying this change

- Tested manually

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
implementRestClusterClientMethods

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5234.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5234


commit c48324702b0ebff3fd7a16e480ead90fc4c9a30b
Author: Till Rohrmann 
Date:   2017-12-07T12:57:24Z

[FLINK-8328] [flip6] Move Yarn ApplicationStatus polling out of 
YarnClusterClient

Introduce YarnApplicationStatusMonitor which does the Yarn 
ApplicationStatus polling in
the FlinkYarnSessionCli. This decouples the YarnClusterClient from the 
actual communication
with Yarn and, thus, gives a better separation of concerns.

commit 783bd1daf36b34a74cbe718a52a1043ba38d5a44
Author: Till Rohrmann 
Date:   2017-12-20T15:43:21Z

[FLINK-8329] [flip6] Move YarnClient to AbstractYarnClusterDescriptor

Moves the YarnClient from the YarnClusterClient to the 
AbstractYarnClusterDescriptor.
This makes the latter responsible for the lifecycle management of the 
client and gives
a better separation of concerns.

commit 192adb786a48d19b71e797355500652d51de6296
Author: Till Rohrmann 
Date:   2017-12-18T17:59:30Z

[FLINK-8332] [flip6] Move savepoint dispose into ClusterClient

Move the savepoint disposal logic from the CliFrontend into the 
ClusterClient. This gives
a better separation of concerns and allows the CliFrontend to be used with 
different
ClusterClient implementations.

commit a73b6e2850d4b2445a835914ba570d1057e59dfb
Author: Till Rohrmann 
Date:   2018-01-02T06:42:18Z

[FLINK-8333] [flip6] Separate deployment options from command options

This commit separates the parsing of command options and deployment options 
into two
steps. This makes it easier to make the CustomCommandLines non-static.

Moreover, this commit moves the CliFrontend into the cli sub package.

commit 57df8f9e1c5f4015cd6543ded43849a71003ef36
Author: Till Rohrmann 
Date:   2018-01-02T06:59:34Z

[FLINK-8338] [flip6] Make CustomCommandLines non static in CliFrontend

This commit changes how CustomCommandLines are registered at the 
CliFrontend.
Henceforth, the CliFrontend is initialized with the set of 
CustomCommandLines
instead of registering them statically. This improves maintainability and
testability.

commit 2ee8059b949b4a9c2e165f3e78716ae531e5c0e7
Author: Till Rohrmann 
Date:   2018-01-02T08:22:12Z

[FLINK-8339] [flip6] Let CustomCommandLine return ClusterDescriptor

Instead of directly retrieving or deploying a Flink cluster, the
CustomCommandLine now only returns a ClusterDescriptor which can be used
for these operations. This disentangles the ClusterDescriptor and the
CustomCommandLine a bit better supporting a proper lifecycle management
of the former.

commit 9ff62c7cfadc977d02f414ee26fae6b76a568eda
Author: Till Rohrmann 
Date:   2018-01-02T08:55:13Z

[FLINK-8340] [flip6] Remove passing of Configuration to CustomCommandLine

Since the Configuration does not change over the lifetime of a 
CustomCommandLine,
we can safely pass it as a constructor argument instead of method argument.

commit 2b6fda525fb737a76b055510f8081003010237ed
Author: Till Rohrmann 

[jira] [Commented] (FLINK-8268) Test instability for 'TwoPhaseCommitSinkFunctionTest'

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308134#comment-16308134
 ] 

ASF GitHub Bot commented on FLINK-8268:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5193#discussion_r159221102
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/ContentDump.java
 ---
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Utility class to simulate in memory file like writes, flushes and 
closing.
+ */
+public class ContentDump {
+   private boolean writable = true;
+   private Map filesContent = new HashMap<>();
+
+   public Set listFiles() {
+   return filesContent.keySet();
+   }
+
+   public void setWritable(boolean writable) {
+   this.writable = writable;
+   }
+
+   /**
+* Creates an empty file.
+*/
+   public ContentWriter createWriter(String name) {
+   checkArgument(!filesContent.containsKey(name), "File [%s] 
already exists", name);
+   filesContent.put(name, new ArrayList<>());
+   return new ContentWriter(name, this);
+   }
+
+   public static void move(String name, ContentDump source, ContentDump 
target) {
+   Collection content = source.read(name);
+   try (ContentWriter contentWriter = target.createWriter(name)) {
+   contentWriter.write(content).flush();
+   }
+   source.delete(name);
+   }
+
+   public void delete(String name) {
+   filesContent.remove(name);
+   }
+
+   public Collection read(String name) {
+   List content = filesContent.get(name);
+   checkState(content != null, "Unknown file [%s]", name);
+   List result = new ArrayList<>(content);
+   content.clear();
+   return result;
+   }
+
+   private void putContent(String name, List values) {
+   List content = filesContent.get(name);
+   checkState(content != null, "Unknown file [%s]", name);
+   if (!writable) {
+   throw new NotWritableException(name);
+   }
+   content.addAll(values);
+   }
+
+   /**
+* {@link ContentWriter} represents an abstraction that allows to 
putContent to the {@link ContentDump}.
+*/
+   public static class ContentWriter implements AutoCloseable {
+   private final ContentDump contentDump;
+   private final String name;
+   private final List buffer = new ArrayList<>();
+   private boolean closed = false;
+
+   private ContentWriter(String name, ContentDump contentDump) {
+   this.name = checkNotNull(name);
+   this.contentDump = checkNotNull(contentDump);
+   }
+
+   public String getName() {
+   return name;
+   }
+
+   public ContentWriter write(String value) {
+   if (closed) {
+   throw new IllegalStateException();
+   }
+   buffer.add(value);
+   return this;
+   }
+
+   public ContentWriter write(Collection values) {
+   values.forEach(this::write);
+   return this;
+  

[GitHub] flink pull request #5193: [FLINK-8268][tests] Improve tests stability

2018-01-02 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5193#discussion_r159229650
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 ---
@@ -492,6 +503,10 @@ public void close() throws Exception {
processingTimeService.shutdownService();
}
setupCalled = false;
+
+   if (internalEnvironment.isPresent()) {
--- End diff --

@tzulitai, @GJL is correct, `Environment` is not 
`Closeable`/`AutoCloseable`. Also I was afraid that someone might be reusing 
some external environment, but I think that's not the case.

Using `environmentIsInternal` flag would require some casting, that's why I 
have chosen `Optional`.

+1 for `internalEnvironment.ifPresent(MockEnvironment::close);`


---


[GitHub] flink pull request #5193: [FLINK-8268][tests] Improve tests stability

2018-01-02 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5193#discussion_r159235730
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
 ---
@@ -42,22 +41,32 @@
 public class SourceFunctionUtil {
 
public static  List 
runSourceFunction(SourceFunction sourceFunction) throws Exception {
-   final List outputs = new ArrayList();
-
if (sourceFunction instanceof RichFunction) {
+   return runRichSourceFunction(sourceFunction);
+   }
+   else {
+   return runNonRichSourceFunction(sourceFunction);
+   }
+   }
 
+   private static  List 
runRichSourceFunction(SourceFunction sourceFunction) throws Exception {
+   try (MockEnvironment environment = new 
MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 
1024)) {
AbstractStreamOperator operator = 
mock(AbstractStreamOperator.class);
when(operator.getExecutionConfig()).thenReturn(new 
ExecutionConfig());
 
-   RuntimeContext runtimeContext =  new 
StreamingRuntimeContext(
-   operator,
-   new MockEnvironment("MockTask", 3 * 
1024 * 1024, new MockInputSplitProvider(), 1024),
-   new HashMap());
-
+   RuntimeContext runtimeContext = new 
StreamingRuntimeContext(
+   operator,
+   environment,
+   new HashMap<>());
((RichFunction) 
sourceFunction).setRuntimeContext(runtimeContext);
-
((RichFunction) sourceFunction).open(new 
Configuration());
+
+   return runNonRichSourceFunction(sourceFunction);
--- End diff --

It wouldn't suffice, because rich function's `MockEnvironment` must be 
closed. It would have to be something like:

```
Optional context = Optional.empty();
try {
if (sourceFunction instanceof RichFunction) {
context = setupRichSourceFunction(sourceFunction);
}
return runSourceFunction(sourceFunction);
}
finally {
context.ifPresent(MockEnvironment::close);
}
```

Which in my opinion is uglier :(


---


[jira] [Commented] (FLINK-8268) Test instability for 'TwoPhaseCommitSinkFunctionTest'

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308135#comment-16308135
 ] 

ASF GitHub Bot commented on FLINK-8268:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5193#discussion_r159221837
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/ContentDump.java
 ---
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Utility class to simulate in memory file like writes, flushes and 
closing.
+ */
+public class ContentDump {
+   private boolean writable = true;
+   private Map filesContent = new HashMap<>();
+
+   public Set listFiles() {
+   return filesContent.keySet();
+   }
+
+   public void setWritable(boolean writable) {
+   this.writable = writable;
+   }
+
+   /**
+* Creates an empty file.
+*/
+   public ContentWriter createWriter(String name) {
+   checkArgument(!filesContent.containsKey(name), "File [%s] 
already exists", name);
+   filesContent.put(name, new ArrayList<>());
+   return new ContentWriter(name, this);
+   }
+
+   public static void move(String name, ContentDump source, ContentDump 
target) {
+   Collection content = source.read(name);
+   try (ContentWriter contentWriter = target.createWriter(name)) {
+   contentWriter.write(content).flush();
+   }
+   source.delete(name);
+   }
+
+   public void delete(String name) {
+   filesContent.remove(name);
+   }
+
+   public Collection read(String name) {
+   List content = filesContent.get(name);
+   checkState(content != null, "Unknown file [%s]", name);
+   List result = new ArrayList<>(content);
+   content.clear();
+   return result;
+   }
+
+   private void putContent(String name, List values) {
+   List content = filesContent.get(name);
+   checkState(content != null, "Unknown file [%s]", name);
+   if (!writable) {
+   throw new NotWritableException(name);
+   }
+   content.addAll(values);
+   }
+
+   /**
+* {@link ContentWriter} represents an abstraction that allows to 
putContent to the {@link ContentDump}.
+*/
+   public static class ContentWriter implements AutoCloseable {
+   private final ContentDump contentDump;
+   private final String name;
+   private final List buffer = new ArrayList<>();
+   private boolean closed = false;
+
+   private ContentWriter(String name, ContentDump contentDump) {
+   this.name = checkNotNull(name);
+   this.contentDump = checkNotNull(contentDump);
+   }
+
+   public String getName() {
+   return name;
+   }
+
+   public ContentWriter write(String value) {
+   if (closed) {
+   throw new IllegalStateException();
+   }
+   buffer.add(value);
+   return this;
+   }
+
+   public ContentWriter write(Collection values) {
+   values.forEach(this::write);
+   return this;
+  

[jira] [Commented] (FLINK-8268) Test instability for 'TwoPhaseCommitSinkFunctionTest'

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308132#comment-16308132
 ] 

ASF GitHub Bot commented on FLINK-8268:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5193#discussion_r15950
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
 ---
@@ -145,19 +145,7 @@ public MemoryManager getMemoryManager() {
}
 
@After
-   public void shutdownIOManager() throws Exception {
-   this.mockEnv.getIOManager().shutdown();
-   Assert.assertTrue("IO Manager has not properly shut down.", 
this.mockEnv.getIOManager().isProperlyShutDown());
-   }
-
-   @After
-   public void shutdownMemoryManager() throws Exception {
-   if (this.memorySize > 0) {
--- End diff --

No, it didn't matter so I allowed myself to simplify this shutdown. But 
good catch. I was thinking about doing this simplification in separate commit 
so that there would be no need for "catching" this behaviour change. 


> Test instability for 'TwoPhaseCommitSinkFunctionTest'
> -
>
> Key: FLINK-8268
> URL: https://issues.apache.org/jira/browse/FLINK-8268
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Stephan Ewen
>Assignee: Piotr Nowojski
>Priority: Critical
>  Labels: test-stability
>
> The following exception / failure message occurs.
> {code}
> Tests run: 5, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 1.824 sec <<< 
> FAILURE! - in 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest
> testIgnoreCommitExceptionDuringRecovery(org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest)
>   Time elapsed: 0.068 sec  <<< ERROR!
> java.lang.Exception: Could not complete snapshot 0 for operator MockTask 
> (1/1).
>   at java.io.FileOutputStream.writeBytes(Native Method)
>   at java.io.FileOutputStream.write(FileOutputStream.java:326)
>   at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
>   at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:291)
>   at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:295)
>   at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:141)
>   at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:229)
>   at java.io.BufferedWriter.flush(BufferedWriter.java:254)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:313)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:288)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:290)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
>   at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest.testIgnoreCommitExceptionDuringRecovery(TwoPhaseCommitSinkFunctionTest.java:208)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8268) Test instability for 'TwoPhaseCommitSinkFunctionTest'

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308137#comment-16308137
 ] 

ASF GitHub Bot commented on FLINK-8268:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5193#discussion_r159220879
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/ContentDump.java
 ---
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Utility class to simulate in memory file like writes, flushes and 
closing.
+ */
+public class ContentDump {
+   private boolean writable = true;
+   private Map filesContent = new HashMap<>();
+
+   public Set listFiles() {
+   return filesContent.keySet();
+   }
+
+   public void setWritable(boolean writable) {
+   this.writable = writable;
+   }
+
+   /**
+* Creates an empty file.
+*/
+   public ContentWriter createWriter(String name) {
+   checkArgument(!filesContent.containsKey(name), "File [%s] 
already exists", name);
+   filesContent.put(name, new ArrayList<>());
+   return new ContentWriter(name, this);
+   }
+
+   public static void move(String name, ContentDump source, ContentDump 
target) {
+   Collection content = source.read(name);
+   try (ContentWriter contentWriter = target.createWriter(name)) {
+   contentWriter.write(content).flush();
+   }
+   source.delete(name);
+   }
+
+   public void delete(String name) {
+   filesContent.remove(name);
+   }
+
+   public Collection read(String name) {
+   List content = filesContent.get(name);
+   checkState(content != null, "Unknown file [%s]", name);
+   List result = new ArrayList<>(content);
+   content.clear();
--- End diff --

Hmm, I was thinking more in a manner of reading from a file descriptor, 
where subsequent reads return only new data. But now when I think about it, 
that would be more expected behaviour for some `ContentReader` class and 
dropping `content.clear()` here makes more sense.


> Test instability for 'TwoPhaseCommitSinkFunctionTest'
> -
>
> Key: FLINK-8268
> URL: https://issues.apache.org/jira/browse/FLINK-8268
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Stephan Ewen
>Assignee: Piotr Nowojski
>Priority: Critical
>  Labels: test-stability
>
> The following exception / failure message occurs.
> {code}
> Tests run: 5, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 1.824 sec <<< 
> FAILURE! - in 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest
> testIgnoreCommitExceptionDuringRecovery(org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest)
>   Time elapsed: 0.068 sec  <<< ERROR!
> java.lang.Exception: Could not complete snapshot 0 for operator MockTask 
> (1/1).
>   at java.io.FileOutputStream.writeBytes(Native Method)
>   at java.io.FileOutputStream.write(FileOutputStream.java:326)
>   at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
>   at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:291)
>   at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:295)
>  

[jira] [Commented] (FLINK-8268) Test instability for 'TwoPhaseCommitSinkFunctionTest'

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308133#comment-16308133
 ] 

ASF GitHub Bot commented on FLINK-8268:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5193#discussion_r159229650
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 ---
@@ -492,6 +503,10 @@ public void close() throws Exception {
processingTimeService.shutdownService();
}
setupCalled = false;
+
+   if (internalEnvironment.isPresent()) {
--- End diff --

@tzulitai, @GJL is correct, `Environment` is not 
`Closeable`/`AutoCloseable`. Also I was afraid that someone might be reusing 
some external environment, but I think that's not the case.

Using `environmentIsInternal` flag would require some casting, that's why I 
have chosen `Optional`.

+1 for `internalEnvironment.ifPresent(MockEnvironment::close);`


> Test instability for 'TwoPhaseCommitSinkFunctionTest'
> -
>
> Key: FLINK-8268
> URL: https://issues.apache.org/jira/browse/FLINK-8268
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Stephan Ewen
>Assignee: Piotr Nowojski
>Priority: Critical
>  Labels: test-stability
>
> The following exception / failure message occurs.
> {code}
> Tests run: 5, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 1.824 sec <<< 
> FAILURE! - in 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest
> testIgnoreCommitExceptionDuringRecovery(org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest)
>   Time elapsed: 0.068 sec  <<< ERROR!
> java.lang.Exception: Could not complete snapshot 0 for operator MockTask 
> (1/1).
>   at java.io.FileOutputStream.writeBytes(Native Method)
>   at java.io.FileOutputStream.write(FileOutputStream.java:326)
>   at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
>   at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:291)
>   at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:295)
>   at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:141)
>   at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:229)
>   at java.io.BufferedWriter.flush(BufferedWriter.java:254)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:313)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:288)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:290)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
>   at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest.testIgnoreCommitExceptionDuringRecovery(TwoPhaseCommitSinkFunctionTest.java:208)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5193: [FLINK-8268][tests] Improve tests stability

2018-01-02 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5193#discussion_r159220879
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/ContentDump.java
 ---
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Utility class to simulate in memory file like writes, flushes and 
closing.
+ */
+public class ContentDump {
+   private boolean writable = true;
+   private Map filesContent = new HashMap<>();
+
+   public Set listFiles() {
+   return filesContent.keySet();
+   }
+
+   public void setWritable(boolean writable) {
+   this.writable = writable;
+   }
+
+   /**
+* Creates an empty file.
+*/
+   public ContentWriter createWriter(String name) {
+   checkArgument(!filesContent.containsKey(name), "File [%s] 
already exists", name);
+   filesContent.put(name, new ArrayList<>());
+   return new ContentWriter(name, this);
+   }
+
+   public static void move(String name, ContentDump source, ContentDump 
target) {
+   Collection content = source.read(name);
+   try (ContentWriter contentWriter = target.createWriter(name)) {
+   contentWriter.write(content).flush();
+   }
+   source.delete(name);
+   }
+
+   public void delete(String name) {
+   filesContent.remove(name);
+   }
+
+   public Collection read(String name) {
+   List content = filesContent.get(name);
+   checkState(content != null, "Unknown file [%s]", name);
+   List result = new ArrayList<>(content);
+   content.clear();
--- End diff --

Hmm, I was thinking more in a manner of reading from a file descriptor, 
where subsequent reads return only new data. But now when I think about it, 
that would be more expected behaviour for some `ContentReader` class and 
dropping `content.clear()` here makes more sense.


---


[jira] [Commented] (FLINK-8268) Test instability for 'TwoPhaseCommitSinkFunctionTest'

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308136#comment-16308136
 ] 

ASF GitHub Bot commented on FLINK-8268:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5193#discussion_r159235730
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
 ---
@@ -42,22 +41,32 @@
 public class SourceFunctionUtil {
 
public static  List 
runSourceFunction(SourceFunction sourceFunction) throws Exception {
-   final List outputs = new ArrayList();
-
if (sourceFunction instanceof RichFunction) {
+   return runRichSourceFunction(sourceFunction);
+   }
+   else {
+   return runNonRichSourceFunction(sourceFunction);
+   }
+   }
 
+   private static  List 
runRichSourceFunction(SourceFunction sourceFunction) throws Exception {
+   try (MockEnvironment environment = new 
MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 
1024)) {
AbstractStreamOperator operator = 
mock(AbstractStreamOperator.class);
when(operator.getExecutionConfig()).thenReturn(new 
ExecutionConfig());
 
-   RuntimeContext runtimeContext =  new 
StreamingRuntimeContext(
-   operator,
-   new MockEnvironment("MockTask", 3 * 
1024 * 1024, new MockInputSplitProvider(), 1024),
-   new HashMap());
-
+   RuntimeContext runtimeContext = new 
StreamingRuntimeContext(
+   operator,
+   environment,
+   new HashMap<>());
((RichFunction) 
sourceFunction).setRuntimeContext(runtimeContext);
-
((RichFunction) sourceFunction).open(new 
Configuration());
+
+   return runNonRichSourceFunction(sourceFunction);
--- End diff --

It wouldn't suffice, because rich function's `MockEnvironment` must be 
closed. It would have to be something like:

```
Optional context = Optional.empty();
try {
if (sourceFunction instanceof RichFunction) {
context = setupRichSourceFunction(sourceFunction);
}
return runSourceFunction(sourceFunction);
}
finally {
context.ifPresent(MockEnvironment::close);
}
```

Which in my opinion is uglier :(


> Test instability for 'TwoPhaseCommitSinkFunctionTest'
> -
>
> Key: FLINK-8268
> URL: https://issues.apache.org/jira/browse/FLINK-8268
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Stephan Ewen
>Assignee: Piotr Nowojski
>Priority: Critical
>  Labels: test-stability
>
> The following exception / failure message occurs.
> {code}
> Tests run: 5, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 1.824 sec <<< 
> FAILURE! - in 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest
> testIgnoreCommitExceptionDuringRecovery(org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest)
>   Time elapsed: 0.068 sec  <<< ERROR!
> java.lang.Exception: Could not complete snapshot 0 for operator MockTask 
> (1/1).
>   at java.io.FileOutputStream.writeBytes(Native Method)
>   at java.io.FileOutputStream.write(FileOutputStream.java:326)
>   at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
>   at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:291)
>   at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:295)
>   at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:141)
>   at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:229)
>   at java.io.BufferedWriter.flush(BufferedWriter.java:254)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:313)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:288)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:290)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>   at 
> 

[jira] [Commented] (FLINK-8268) Test instability for 'TwoPhaseCommitSinkFunctionTest'

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308131#comment-16308131
 ] 

ASF GitHub Bot commented on FLINK-8268:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5193#discussion_r159224671
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
 ---
@@ -179,9 +179,6 @@ public void testFailBeforeNotify() throws Exception {
harness.initializeState(snapshot);
 
assertExactlyOnce(Arrays.asList("42", "43"));
-   closeTestHarness();
-
-   assertEquals(0, tmpDirectory.listFiles().size());
--- End diff --

restored. 


> Test instability for 'TwoPhaseCommitSinkFunctionTest'
> -
>
> Key: FLINK-8268
> URL: https://issues.apache.org/jira/browse/FLINK-8268
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Stephan Ewen
>Assignee: Piotr Nowojski
>Priority: Critical
>  Labels: test-stability
>
> The following exception / failure message occurs.
> {code}
> Tests run: 5, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 1.824 sec <<< 
> FAILURE! - in 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest
> testIgnoreCommitExceptionDuringRecovery(org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest)
>   Time elapsed: 0.068 sec  <<< ERROR!
> java.lang.Exception: Could not complete snapshot 0 for operator MockTask 
> (1/1).
>   at java.io.FileOutputStream.writeBytes(Native Method)
>   at java.io.FileOutputStream.write(FileOutputStream.java:326)
>   at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
>   at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:291)
>   at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:295)
>   at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:141)
>   at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:229)
>   at java.io.BufferedWriter.flush(BufferedWriter.java:254)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:313)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:288)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:290)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
>   at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest.testIgnoreCommitExceptionDuringRecovery(TwoPhaseCommitSinkFunctionTest.java:208)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5193: [FLINK-8268][tests] Improve tests stability

2018-01-02 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5193#discussion_r15950
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
 ---
@@ -145,19 +145,7 @@ public MemoryManager getMemoryManager() {
}
 
@After
-   public void shutdownIOManager() throws Exception {
-   this.mockEnv.getIOManager().shutdown();
-   Assert.assertTrue("IO Manager has not properly shut down.", 
this.mockEnv.getIOManager().isProperlyShutDown());
-   }
-
-   @After
-   public void shutdownMemoryManager() throws Exception {
-   if (this.memorySize > 0) {
--- End diff --

No, it didn't matter so I allowed myself to simplify this shutdown. But 
good catch. I was thinking about doing this simplification in separate commit 
so that there would be no need for "catching" this behaviour change. 


---


[GitHub] flink pull request #5193: [FLINK-8268][tests] Improve tests stability

2018-01-02 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5193#discussion_r159221102
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/ContentDump.java
 ---
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Utility class to simulate in memory file like writes, flushes and 
closing.
+ */
+public class ContentDump {
+   private boolean writable = true;
+   private Map filesContent = new HashMap<>();
+
+   public Set listFiles() {
+   return filesContent.keySet();
+   }
+
+   public void setWritable(boolean writable) {
+   this.writable = writable;
+   }
+
+   /**
+* Creates an empty file.
+*/
+   public ContentWriter createWriter(String name) {
+   checkArgument(!filesContent.containsKey(name), "File [%s] 
already exists", name);
+   filesContent.put(name, new ArrayList<>());
+   return new ContentWriter(name, this);
+   }
+
+   public static void move(String name, ContentDump source, ContentDump 
target) {
+   Collection content = source.read(name);
+   try (ContentWriter contentWriter = target.createWriter(name)) {
+   contentWriter.write(content).flush();
+   }
+   source.delete(name);
+   }
+
+   public void delete(String name) {
+   filesContent.remove(name);
+   }
+
+   public Collection read(String name) {
+   List content = filesContent.get(name);
+   checkState(content != null, "Unknown file [%s]", name);
+   List result = new ArrayList<>(content);
+   content.clear();
+   return result;
+   }
+
+   private void putContent(String name, List values) {
+   List content = filesContent.get(name);
+   checkState(content != null, "Unknown file [%s]", name);
+   if (!writable) {
+   throw new NotWritableException(name);
+   }
+   content.addAll(values);
+   }
+
+   /**
+* {@link ContentWriter} represents an abstraction that allows to 
putContent to the {@link ContentDump}.
+*/
+   public static class ContentWriter implements AutoCloseable {
+   private final ContentDump contentDump;
+   private final String name;
+   private final List buffer = new ArrayList<>();
+   private boolean closed = false;
+
+   private ContentWriter(String name, ContentDump contentDump) {
+   this.name = checkNotNull(name);
+   this.contentDump = checkNotNull(contentDump);
+   }
+
+   public String getName() {
+   return name;
+   }
+
+   public ContentWriter write(String value) {
+   if (closed) {
+   throw new IllegalStateException();
+   }
+   buffer.add(value);
+   return this;
+   }
+
+   public ContentWriter write(Collection values) {
+   values.forEach(this::write);
+   return this;
+   }
+
+   public ContentWriter flush() {
+   contentDump.putContent(name, buffer);
+   return this;
+   }
+
+   public void close() {
--- End diff --
  

[GitHub] flink pull request #5193: [FLINK-8268][tests] Improve tests stability

2018-01-02 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5193#discussion_r159221837
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/ContentDump.java
 ---
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Utility class to simulate in memory file like writes, flushes and 
closing.
+ */
+public class ContentDump {
+   private boolean writable = true;
+   private Map filesContent = new HashMap<>();
+
+   public Set listFiles() {
+   return filesContent.keySet();
+   }
+
+   public void setWritable(boolean writable) {
+   this.writable = writable;
+   }
+
+   /**
+* Creates an empty file.
+*/
+   public ContentWriter createWriter(String name) {
+   checkArgument(!filesContent.containsKey(name), "File [%s] 
already exists", name);
+   filesContent.put(name, new ArrayList<>());
+   return new ContentWriter(name, this);
+   }
+
+   public static void move(String name, ContentDump source, ContentDump 
target) {
+   Collection content = source.read(name);
+   try (ContentWriter contentWriter = target.createWriter(name)) {
+   contentWriter.write(content).flush();
+   }
+   source.delete(name);
+   }
+
+   public void delete(String name) {
+   filesContent.remove(name);
+   }
+
+   public Collection read(String name) {
+   List content = filesContent.get(name);
+   checkState(content != null, "Unknown file [%s]", name);
+   List result = new ArrayList<>(content);
+   content.clear();
+   return result;
+   }
+
+   private void putContent(String name, List values) {
+   List content = filesContent.get(name);
+   checkState(content != null, "Unknown file [%s]", name);
+   if (!writable) {
+   throw new NotWritableException(name);
+   }
+   content.addAll(values);
+   }
+
+   /**
+* {@link ContentWriter} represents an abstraction that allows to 
putContent to the {@link ContentDump}.
+*/
+   public static class ContentWriter implements AutoCloseable {
+   private final ContentDump contentDump;
+   private final String name;
+   private final List buffer = new ArrayList<>();
+   private boolean closed = false;
+
+   private ContentWriter(String name, ContentDump contentDump) {
+   this.name = checkNotNull(name);
+   this.contentDump = checkNotNull(contentDump);
+   }
+
+   public String getName() {
+   return name;
+   }
+
+   public ContentWriter write(String value) {
+   if (closed) {
+   throw new IllegalStateException();
+   }
+   buffer.add(value);
+   return this;
+   }
+
+   public ContentWriter write(Collection values) {
+   values.forEach(this::write);
+   return this;
+   }
+
+   public ContentWriter flush() {
+   contentDump.putContent(name, buffer);
+   return this;
+   }
+
+   public void close() {
+ 

[GitHub] flink pull request #5193: [FLINK-8268][tests] Improve tests stability

2018-01-02 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5193#discussion_r159224671
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
 ---
@@ -179,9 +179,6 @@ public void testFailBeforeNotify() throws Exception {
harness.initializeState(snapshot);
 
assertExactlyOnce(Arrays.asList("42", "43"));
-   closeTestHarness();
-
-   assertEquals(0, tmpDirectory.listFiles().size());
--- End diff --

restored. 


---


[jira] [Updated] (FLINK-8119) Cannot submit jobs to YARN Session in FLIP-6 mode

2018-01-02 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-8119:
-
Issue Type: Sub-task  (was: Bug)
Parent: FLINK-4352

> Cannot submit jobs to YARN Session in FLIP-6 mode
> -
>
> Key: FLINK-8119
> URL: https://issues.apache.org/jira/browse/FLINK-8119
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Cannot submit jobs to YARN Session in FLIP-6 mode because 
> {{FlinkYarnSessionCli}} becomes the _active_ CLI (should be 
> {{Flip6DefaultCLI}}).
> *Steps to reproduce*
> # Build Flink 1.5 {{101fef7397128b0aba23221481ab86f62b18118f}}
> # {{bin/yarn-session.sh -flip6 -d -n 1 -s 1 -jm 1024 -tm 1024}}
> # {{bin/flink run -flip6 ./examples/streaming/WordCount.jar}}
> # Verify that the job will not run.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8328) Pull Yarn ApplicationStatus polling out of YarnClusterClient

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308116#comment-16308116
 ] 

ASF GitHub Bot commented on FLINK-8328:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5215#discussion_r159227955
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -660,7 +570,25 @@ public int run(
"yarn application -kill " + 
applicationId.getOpt());
yarnCluster.disconnect();
} else {
-   runInteractiveCli(yarnCluster, true);
+   ScheduledThreadPoolExecutor 
scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
--- End diff --

I think the executor could as well be in the Monitor. If needed in the 
future, one could provide a constructor that accepts an external executor 
(e.g., for unit tests).


> Pull Yarn ApplicationStatus polling out of YarnClusterClient
> 
>
> Key: FLINK-8328
> URL: https://issues.apache.org/jira/browse/FLINK-8328
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to make the {{FlinkYarnSessionCli}} work with Flip-6, we have to 
> pull the Yarn {{ApplicationStatus}} polling out of the {{YarnClusterClient}}. 
> I propose to introduce a dedicated {{YarnApplicationStatusMonitor}}. This has 
> also the benefit of separating concerns better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8328) Pull Yarn ApplicationStatus polling out of YarnClusterClient

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308122#comment-16308122
 ] 

ASF GitHub Bot commented on FLINK-8328:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5215#discussion_r159228367
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -717,7 +645,26 @@ public int run(
yarnCluster.waitForClusterToBeReady();
yarnCluster.disconnect();
} else {
-   runInteractiveCli(yarnCluster, 
acceptInteractiveInput);
+
+   ScheduledThreadPoolExecutor 
scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+
+   try (YarnApplicationStatusMonitor 
yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor(
+   yarnDescriptor.getYarnClient(),
+   yarnCluster.getApplicationId(),
+   new 
ScheduledExecutorServiceAdapter(scheduledExecutorService))){
+   runInteractiveCli(
+   yarnCluster,
+   yarnApplicationStatusMonitor,
+   acceptInteractiveInput);
--- End diff --

The code block looks duplicated except for this flag.


> Pull Yarn ApplicationStatus polling out of YarnClusterClient
> 
>
> Key: FLINK-8328
> URL: https://issues.apache.org/jira/browse/FLINK-8328
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to make the {{FlinkYarnSessionCli}} work with Flip-6, we have to 
> pull the Yarn {{ApplicationStatus}} polling out of the {{YarnClusterClient}}. 
> I propose to introduce a dedicated {{YarnApplicationStatusMonitor}}. This has 
> also the benefit of separating concerns better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5215: [FLINK-8328] [flip6] Move Yarn ApplicationStatus p...

2018-01-02 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5215#discussion_r159227619
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -717,7 +645,26 @@ public int run(
yarnCluster.waitForClusterToBeReady();
yarnCluster.disconnect();
} else {
-   runInteractiveCli(yarnCluster, 
acceptInteractiveInput);
+
+   ScheduledThreadPoolExecutor 
scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+
+   try (YarnApplicationStatusMonitor 
yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor(
+   yarnDescriptor.getYarnClient(),
+   yarnCluster.getApplicationId(),
+   new 
ScheduledExecutorServiceAdapter(scheduledExecutorService))){
+   runInteractiveCli(
+   yarnCluster,
+   yarnApplicationStatusMonitor,
+   acceptInteractiveInput);
+   } catch (Exception e) {
+   LOG.info("Could not properly close the 
Yarn application status monitor.", e);
--- End diff --

Same here. Catch block could be avoided.


---


[jira] [Commented] (FLINK-8328) Pull Yarn ApplicationStatus polling out of YarnClusterClient

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308120#comment-16308120
 ] 

ASF GitHub Bot commented on FLINK-8328:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5215#discussion_r159224695
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -743,6 +690,142 @@ private void logAndSysout(String message) {
System.out.println(message);
}
 
+   public static void main(final String[] args) throws Exception {
+   final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", 
""); // no prefix for the YARN session
+
+   final String configurationDirectory = 
CliFrontend.getConfigurationDirectoryFromEnv();
+
+   final Configuration flinkConfiguration = 
GlobalConfiguration.loadConfiguration();
+   SecurityUtils.install(new 
SecurityConfiguration(flinkConfiguration));
+   int retCode = 
SecurityUtils.getInstalledContext().runSecured(new Callable() {
+   @Override
+   public Integer call() {
+   return cli.run(args, flinkConfiguration, 
configurationDirectory);
+   }
+   });
+   System.exit(retCode);
+   }
+
+   private static void runInteractiveCli(
+   YarnClusterClient clusterClient,
+   YarnApplicationStatusMonitor yarnApplicationStatusMonitor,
+   boolean readConsoleInput) {
+   try (BufferedReader in = new BufferedReader(new 
InputStreamReader(System.in))) {
+   boolean continueRepl = true;
+   int numTaskmanagers = 0;
+   long unknownStatusSince = System.currentTimeMillis();
+
+   while (continueRepl) {
+
+   final ApplicationStatus applicationStatus = 
yarnApplicationStatusMonitor.getApplicationStatusNow();
+
+   switch (applicationStatus) {
+   case FAILED:
+   case CANCELED:
+   System.err.println("The Flink 
Yarn cluster has failed.");
+   continueRepl = false;
+   break;
+   case UNKNOWN:
+   if (unknownStatusSince < 0L) {
+   unknownStatusSince = 
System.currentTimeMillis();
+   }
+
+   if ((System.currentTimeMillis() 
- unknownStatusSince) > CLIENT_POLLING_INTERVAL_MS) {
+   System.err.println("The 
Flink Yarn cluster is in an unknown state. Please check the Yarn cluster.");
+   continueRepl = false;
+   } else {
+   continueRepl = 
repStep(in, readConsoleInput);
+   }
+   break;
+   case SUCCEEDED:
+   if (unknownStatusSince > 0L) {
+   unknownStatusSince = 
-1L;
+   }
+
+   // -- check if 
there are updates by the cluster ---
+   try {
+   final 
GetClusterStatusResponse status = clusterClient.getClusterStatus();
+
+   if (status != null && 
numTaskmanagers != status.numRegisteredTaskManagers()) {
+   
System.err.println("Number of connected TaskManagers changed to " +
+   
status.numRegisteredTaskManagers() + ". " +
+   "Slots 
available: " + status.totalNumberOfSlots());
+   numTaskmanagers 
= status.numRegisteredTaskManagers();
+   }
+   } catch (Exception e) {
+   LOG.warn("Could not 
retrieve the current cluster status. Skipping current retrieval attempt ...", 
e);
+  

[GitHub] flink pull request #5215: [FLINK-8328] [flip6] Move Yarn ApplicationStatus p...

2018-01-02 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5215#discussion_r159226314
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -660,7 +570,25 @@ public int run(
"yarn application -kill " + 
applicationId.getOpt());
yarnCluster.disconnect();
} else {
-   runInteractiveCli(yarnCluster, true);
+   ScheduledThreadPoolExecutor 
scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+
+   try (YarnApplicationStatusMonitor 
yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor(
+   yarnDescriptor.getYarnClient(),
+   yarnCluster.getApplicationId(),
+   new 
ScheduledExecutorServiceAdapter(scheduledExecutorService))) {
+   runInteractiveCli(
+   yarnCluster,
+   yarnApplicationStatusMonitor,
+   true);
+   } catch (Exception e) {
--- End diff --

Closing `YarnApplicationStatusMonitor` should not throw any checked 
exceptions. If you change the signature, this catch block won't be needed.


---


[jira] [Commented] (FLINK-8328) Pull Yarn ApplicationStatus polling out of YarnClusterClient

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308118#comment-16308118
 ] 

ASF GitHub Bot commented on FLINK-8328:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5215#discussion_r159227619
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -717,7 +645,26 @@ public int run(
yarnCluster.waitForClusterToBeReady();
yarnCluster.disconnect();
} else {
-   runInteractiveCli(yarnCluster, 
acceptInteractiveInput);
+
+   ScheduledThreadPoolExecutor 
scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+
+   try (YarnApplicationStatusMonitor 
yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor(
+   yarnDescriptor.getYarnClient(),
+   yarnCluster.getApplicationId(),
+   new 
ScheduledExecutorServiceAdapter(scheduledExecutorService))){
+   runInteractiveCli(
+   yarnCluster,
+   yarnApplicationStatusMonitor,
+   acceptInteractiveInput);
+   } catch (Exception e) {
+   LOG.info("Could not properly close the 
Yarn application status monitor.", e);
--- End diff --

Same here. Catch block could be avoided.


> Pull Yarn ApplicationStatus polling out of YarnClusterClient
> 
>
> Key: FLINK-8328
> URL: https://issues.apache.org/jira/browse/FLINK-8328
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to make the {{FlinkYarnSessionCli}} work with Flip-6, we have to 
> pull the Yarn {{ApplicationStatus}} polling out of the {{YarnClusterClient}}. 
> I propose to introduce a dedicated {{YarnApplicationStatusMonitor}}. This has 
> also the benefit of separating concerns better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8328) Pull Yarn ApplicationStatus polling out of YarnClusterClient

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308119#comment-16308119
 ] 

ASF GitHub Bot commented on FLINK-8328:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5215#discussion_r159225871
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/YarnApplicationStatusMonitor.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.cli;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Utility class which monitors the specified yarn application status 
periodically.
+ */
+public class YarnApplicationStatusMonitor implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(YarnApplicationStatusMonitor.class);
+
+   private static final long UPDATE_INTERVAL = 1000L;
+
+   private final YarnClient yarnClient;
+
+   private final ApplicationId yarnApplicationId;
+
+   private final ScheduledFuture applicationStatusUpdateFuture;
+
+   private volatile ApplicationStatus applicationStatus;
+
+   public YarnApplicationStatusMonitor(
+   YarnClient yarnClient,
+   ApplicationId yarnApplicationId,
+   ScheduledExecutor scheduledExecutor) {
+   this.yarnClient = Preconditions.checkNotNull(yarnClient);
+   this.yarnApplicationId = 
Preconditions.checkNotNull(yarnApplicationId);
+
+   applicationStatusUpdateFuture = 
scheduledExecutor.scheduleWithFixedDelay(
+   this::updateApplicationStatus,
+   UPDATE_INTERVAL,
+   UPDATE_INTERVAL,
+   TimeUnit.MILLISECONDS);
+
+   applicationStatus = ApplicationStatus.UNKNOWN;
+   }
+
+   public ApplicationStatus getApplicationStatusNow() {
+   return applicationStatus;
+   }
+
+   @Override
+   public void close() throws Exception {
+   applicationStatusUpdateFuture.cancel(false);
--- End diff --

There is no need to declare `throws Exception` here because `cancel()` does 
not throw any checked exceptions.


> Pull Yarn ApplicationStatus polling out of YarnClusterClient
> 
>
> Key: FLINK-8328
> URL: https://issues.apache.org/jira/browse/FLINK-8328
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to make the {{FlinkYarnSessionCli}} work with Flip-6, we have to 
> pull the Yarn {{ApplicationStatus}} polling out of the {{YarnClusterClient}}. 
> I propose to introduce a dedicated {{YarnApplicationStatusMonitor}}. This has 
> also the benefit of separating concerns better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8328) Pull Yarn ApplicationStatus polling out of YarnClusterClient

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308121#comment-16308121
 ] 

ASF GitHub Bot commented on FLINK-8328:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5215#discussion_r159226314
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -660,7 +570,25 @@ public int run(
"yarn application -kill " + 
applicationId.getOpt());
yarnCluster.disconnect();
} else {
-   runInteractiveCli(yarnCluster, true);
+   ScheduledThreadPoolExecutor 
scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+
+   try (YarnApplicationStatusMonitor 
yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor(
+   yarnDescriptor.getYarnClient(),
+   yarnCluster.getApplicationId(),
+   new 
ScheduledExecutorServiceAdapter(scheduledExecutorService))) {
+   runInteractiveCli(
+   yarnCluster,
+   yarnApplicationStatusMonitor,
+   true);
+   } catch (Exception e) {
--- End diff --

Closing `YarnApplicationStatusMonitor` should not throw any checked 
exceptions. If you change the signature, this catch block won't be needed.


> Pull Yarn ApplicationStatus polling out of YarnClusterClient
> 
>
> Key: FLINK-8328
> URL: https://issues.apache.org/jira/browse/FLINK-8328
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to make the {{FlinkYarnSessionCli}} work with Flip-6, we have to 
> pull the Yarn {{ApplicationStatus}} polling out of the {{YarnClusterClient}}. 
> I propose to introduce a dedicated {{YarnApplicationStatusMonitor}}. This has 
> also the benefit of separating concerns better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5215: [FLINK-8328] [flip6] Move Yarn ApplicationStatus p...

2018-01-02 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5215#discussion_r159224695
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -743,6 +690,142 @@ private void logAndSysout(String message) {
System.out.println(message);
}
 
+   public static void main(final String[] args) throws Exception {
+   final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", 
""); // no prefix for the YARN session
+
+   final String configurationDirectory = 
CliFrontend.getConfigurationDirectoryFromEnv();
+
+   final Configuration flinkConfiguration = 
GlobalConfiguration.loadConfiguration();
+   SecurityUtils.install(new 
SecurityConfiguration(flinkConfiguration));
+   int retCode = 
SecurityUtils.getInstalledContext().runSecured(new Callable() {
+   @Override
+   public Integer call() {
+   return cli.run(args, flinkConfiguration, 
configurationDirectory);
+   }
+   });
+   System.exit(retCode);
+   }
+
+   private static void runInteractiveCli(
+   YarnClusterClient clusterClient,
+   YarnApplicationStatusMonitor yarnApplicationStatusMonitor,
+   boolean readConsoleInput) {
+   try (BufferedReader in = new BufferedReader(new 
InputStreamReader(System.in))) {
+   boolean continueRepl = true;
+   int numTaskmanagers = 0;
+   long unknownStatusSince = System.currentTimeMillis();
+
+   while (continueRepl) {
+
+   final ApplicationStatus applicationStatus = 
yarnApplicationStatusMonitor.getApplicationStatusNow();
+
+   switch (applicationStatus) {
+   case FAILED:
+   case CANCELED:
+   System.err.println("The Flink 
Yarn cluster has failed.");
+   continueRepl = false;
+   break;
+   case UNKNOWN:
+   if (unknownStatusSince < 0L) {
+   unknownStatusSince = 
System.currentTimeMillis();
+   }
+
+   if ((System.currentTimeMillis() 
- unknownStatusSince) > CLIENT_POLLING_INTERVAL_MS) {
+   System.err.println("The 
Flink Yarn cluster is in an unknown state. Please check the Yarn cluster.");
+   continueRepl = false;
+   } else {
+   continueRepl = 
repStep(in, readConsoleInput);
+   }
+   break;
+   case SUCCEEDED:
+   if (unknownStatusSince > 0L) {
+   unknownStatusSince = 
-1L;
+   }
+
+   // -- check if 
there are updates by the cluster ---
+   try {
+   final 
GetClusterStatusResponse status = clusterClient.getClusterStatus();
+
+   if (status != null && 
numTaskmanagers != status.numRegisteredTaskManagers()) {
+   
System.err.println("Number of connected TaskManagers changed to " +
+   
status.numRegisteredTaskManagers() + ". " +
+   "Slots 
available: " + status.totalNumberOfSlots());
+   numTaskmanagers 
= status.numRegisteredTaskManagers();
+   }
+   } catch (Exception e) {
+   LOG.warn("Could not 
retrieve the current cluster status. Skipping current retrieval attempt ...", 
e);
+   }
+
+   
printClusterMessages(clusterClient);
+
+   continueRepl = repStep(in, 
readConsoleInput);
   

[GitHub] flink pull request #5215: [FLINK-8328] [flip6] Move Yarn ApplicationStatus p...

2018-01-02 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5215#discussion_r159225871
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/YarnApplicationStatusMonitor.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.cli;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Utility class which monitors the specified yarn application status 
periodically.
+ */
+public class YarnApplicationStatusMonitor implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(YarnApplicationStatusMonitor.class);
+
+   private static final long UPDATE_INTERVAL = 1000L;
+
+   private final YarnClient yarnClient;
+
+   private final ApplicationId yarnApplicationId;
+
+   private final ScheduledFuture applicationStatusUpdateFuture;
+
+   private volatile ApplicationStatus applicationStatus;
+
+   public YarnApplicationStatusMonitor(
+   YarnClient yarnClient,
+   ApplicationId yarnApplicationId,
+   ScheduledExecutor scheduledExecutor) {
+   this.yarnClient = Preconditions.checkNotNull(yarnClient);
+   this.yarnApplicationId = 
Preconditions.checkNotNull(yarnApplicationId);
+
+   applicationStatusUpdateFuture = 
scheduledExecutor.scheduleWithFixedDelay(
+   this::updateApplicationStatus,
+   UPDATE_INTERVAL,
+   UPDATE_INTERVAL,
+   TimeUnit.MILLISECONDS);
+
+   applicationStatus = ApplicationStatus.UNKNOWN;
+   }
+
+   public ApplicationStatus getApplicationStatusNow() {
+   return applicationStatus;
+   }
+
+   @Override
+   public void close() throws Exception {
+   applicationStatusUpdateFuture.cancel(false);
--- End diff --

There is no need to declare `throws Exception` here because `cancel()` does 
not throw any checked exceptions.


---


[GitHub] flink pull request #5215: [FLINK-8328] [flip6] Move Yarn ApplicationStatus p...

2018-01-02 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5215#discussion_r159227955
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -660,7 +570,25 @@ public int run(
"yarn application -kill " + 
applicationId.getOpt());
yarnCluster.disconnect();
} else {
-   runInteractiveCli(yarnCluster, true);
+   ScheduledThreadPoolExecutor 
scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
--- End diff --

I think the executor could as well be in the Monitor. If needed in the 
future, one could provide a constructor that accepts an external executor 
(e.g., for unit tests).


---


[jira] [Commented] (FLINK-8328) Pull Yarn ApplicationStatus polling out of YarnClusterClient

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308115#comment-16308115
 ] 

ASF GitHub Bot commented on FLINK-8328:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5215#discussion_r159227421
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -660,7 +570,25 @@ public int run(
"yarn application -kill " + 
applicationId.getOpt());
yarnCluster.disconnect();
} else {
-   runInteractiveCli(yarnCluster, true);
+   ScheduledThreadPoolExecutor 
scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+
+   try (YarnApplicationStatusMonitor 
yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor(
+   yarnDescriptor.getYarnClient(),
+   yarnCluster.getApplicationId(),
+   new 
ScheduledExecutorServiceAdapter(scheduledExecutorService))) {
--- End diff --

Why do we need to use the `ScheduledExecutor` interface from Flink? Why not 
use Java's `ScheduledExecutorService` directly?


> Pull Yarn ApplicationStatus polling out of YarnClusterClient
> 
>
> Key: FLINK-8328
> URL: https://issues.apache.org/jira/browse/FLINK-8328
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to make the {{FlinkYarnSessionCli}} work with Flip-6, we have to 
> pull the Yarn {{ApplicationStatus}} polling out of the {{YarnClusterClient}}. 
> I propose to introduce a dedicated {{YarnApplicationStatusMonitor}}. This has 
> also the benefit of separating concerns better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5215: [FLINK-8328] [flip6] Move Yarn ApplicationStatus p...

2018-01-02 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5215#discussion_r159228367
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -717,7 +645,26 @@ public int run(
yarnCluster.waitForClusterToBeReady();
yarnCluster.disconnect();
} else {
-   runInteractiveCli(yarnCluster, 
acceptInteractiveInput);
+
+   ScheduledThreadPoolExecutor 
scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+
+   try (YarnApplicationStatusMonitor 
yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor(
+   yarnDescriptor.getYarnClient(),
+   yarnCluster.getApplicationId(),
+   new 
ScheduledExecutorServiceAdapter(scheduledExecutorService))){
+   runInteractiveCli(
+   yarnCluster,
+   yarnApplicationStatusMonitor,
+   acceptInteractiveInput);
--- End diff --

The code block looks duplicated except for this flag.


---


[jira] [Commented] (FLINK-8328) Pull Yarn ApplicationStatus polling out of YarnClusterClient

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308117#comment-16308117
 ] 

ASF GitHub Bot commented on FLINK-8328:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5215#discussion_r159230885
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -743,6 +690,142 @@ private void logAndSysout(String message) {
System.out.println(message);
}
 
+   public static void main(final String[] args) throws Exception {
+   final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", 
""); // no prefix for the YARN session
+
+   final String configurationDirectory = 
CliFrontend.getConfigurationDirectoryFromEnv();
+
+   final Configuration flinkConfiguration = 
GlobalConfiguration.loadConfiguration();
+   SecurityUtils.install(new 
SecurityConfiguration(flinkConfiguration));
+   int retCode = 
SecurityUtils.getInstalledContext().runSecured(new Callable() {
+   @Override
+   public Integer call() {
+   return cli.run(args, flinkConfiguration, 
configurationDirectory);
+   }
+   });
+   System.exit(retCode);
+   }
+
+   private static void runInteractiveCli(
+   YarnClusterClient clusterClient,
+   YarnApplicationStatusMonitor yarnApplicationStatusMonitor,
+   boolean readConsoleInput) {
+   try (BufferedReader in = new BufferedReader(new 
InputStreamReader(System.in))) {
+   boolean continueRepl = true;
+   int numTaskmanagers = 0;
+   long unknownStatusSince = System.currentTimeMillis();
--- End diff --

nit: `System.nanoTime()` should be preferred to measure elapsed time 
because it does not depend on wall clock, i.e., it is not affected by the user 
changing the system's time: https://stackoverflow.com/a/351571
However, if you use `nanoTime()`, the trick in line `729` with negative 
`unknownStatusSince` won't work.


> Pull Yarn ApplicationStatus polling out of YarnClusterClient
> 
>
> Key: FLINK-8328
> URL: https://issues.apache.org/jira/browse/FLINK-8328
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to make the {{FlinkYarnSessionCli}} work with Flip-6, we have to 
> pull the Yarn {{ApplicationStatus}} polling out of the {{YarnClusterClient}}. 
> I propose to introduce a dedicated {{YarnApplicationStatusMonitor}}. This has 
> also the benefit of separating concerns better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5215: [FLINK-8328] [flip6] Move Yarn ApplicationStatus p...

2018-01-02 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5215#discussion_r159230885
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -743,6 +690,142 @@ private void logAndSysout(String message) {
System.out.println(message);
}
 
+   public static void main(final String[] args) throws Exception {
+   final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", 
""); // no prefix for the YARN session
+
+   final String configurationDirectory = 
CliFrontend.getConfigurationDirectoryFromEnv();
+
+   final Configuration flinkConfiguration = 
GlobalConfiguration.loadConfiguration();
+   SecurityUtils.install(new 
SecurityConfiguration(flinkConfiguration));
+   int retCode = 
SecurityUtils.getInstalledContext().runSecured(new Callable() {
+   @Override
+   public Integer call() {
+   return cli.run(args, flinkConfiguration, 
configurationDirectory);
+   }
+   });
+   System.exit(retCode);
+   }
+
+   private static void runInteractiveCli(
+   YarnClusterClient clusterClient,
+   YarnApplicationStatusMonitor yarnApplicationStatusMonitor,
+   boolean readConsoleInput) {
+   try (BufferedReader in = new BufferedReader(new 
InputStreamReader(System.in))) {
+   boolean continueRepl = true;
+   int numTaskmanagers = 0;
+   long unknownStatusSince = System.currentTimeMillis();
--- End diff --

nit: `System.nanoTime()` should be preferred to measure elapsed time 
because it does not depend on wall clock, i.e., it is not affected by the user 
changing the system's time: https://stackoverflow.com/a/351571
However, if you use `nanoTime()`, the trick in line `729` with negative 
`unknownStatusSince` won't work.


---


[jira] [Commented] (FLINK-8348) Print help for DefaultCLI

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308113#comment-16308113
 ] 

ASF GitHub Bot commented on FLINK-8348:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5233

[FLINK-8348] [flip6] Print help for DefaultCLI

## What is the purpose of the change

Print help for the `DefaultCLI`.

This PR is based on #5232.

## Verifying this change

- Trivial change

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixCustomCommandLineHelp

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5233.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5233


commit c48324702b0ebff3fd7a16e480ead90fc4c9a30b
Author: Till Rohrmann 
Date:   2017-12-07T12:57:24Z

[FLINK-8328] [flip6] Move Yarn ApplicationStatus polling out of 
YarnClusterClient

Introduce YarnApplicationStatusMonitor which does the Yarn 
ApplicationStatus polling in
the FlinkYarnSessionCli. This decouples the YarnClusterClient from the 
actual communication
with Yarn and, thus, gives a better separation of concerns.

commit 783bd1daf36b34a74cbe718a52a1043ba38d5a44
Author: Till Rohrmann 
Date:   2017-12-20T15:43:21Z

[FLINK-8329] [flip6] Move YarnClient to AbstractYarnClusterDescriptor

Moves the YarnClient from the YarnClusterClient to the 
AbstractYarnClusterDescriptor.
This makes the latter responsible for the lifecycle management of the 
client and gives
a better separation of concerns.

commit 192adb786a48d19b71e797355500652d51de6296
Author: Till Rohrmann 
Date:   2017-12-18T17:59:30Z

[FLINK-8332] [flip6] Move savepoint dispose into ClusterClient

Move the savepoint disposal logic from the CliFrontend into the 
ClusterClient. This gives
a better separation of concerns and allows the CliFrontend to be used with 
different
ClusterClient implementations.

commit a73b6e2850d4b2445a835914ba570d1057e59dfb
Author: Till Rohrmann 
Date:   2018-01-02T06:42:18Z

[FLINK-8333] [flip6] Separate deployment options from command options

This commit separates the parsing of command options and deployment options 
into two
steps. This makes it easier to make the CustomCommandLines non-static.

Moreover, this commit moves the CliFrontend into the cli sub package.

commit 57df8f9e1c5f4015cd6543ded43849a71003ef36
Author: Till Rohrmann 
Date:   2018-01-02T06:59:34Z

[FLINK-8338] [flip6] Make CustomCommandLines non static in CliFrontend

This commit changes how CustomCommandLines are registered at the 
CliFrontend.
Henceforth, the CliFrontend is initialized with the set of 
CustomCommandLines
instead of registering them statically. This improves maintainability and
testability.

commit 2ee8059b949b4a9c2e165f3e78716ae531e5c0e7
Author: Till Rohrmann 
Date:   2018-01-02T08:22:12Z

[FLINK-8339] [flip6] Let CustomCommandLine return ClusterDescriptor

Instead of directly retrieving or deploying a Flink cluster, the
CustomCommandLine now only returns a ClusterDescriptor which can be used
for these operations. This disentangles the ClusterDescriptor and the
CustomCommandLine a bit better supporting a proper lifecycle management
of the former.

commit 9ff62c7cfadc977d02f414ee26fae6b76a568eda
Author: Till Rohrmann 
Date:   2018-01-02T08:55:13Z

[FLINK-8340] [flip6] Remove passing of Configuration to CustomCommandLine

Since the Configuration does not change over the lifetime of a 
CustomCommandLine,
we can safely pass it as a constructor argument instead of method argument.

commit 2b6fda525fb737a76b055510f8081003010237ed
Author: Till Rohrmann 
Date:   2017-12-20T15:32:18Z

[FLINK-8341] [flip6] Remove not needed options from CommandLineOptions

commit 17755e5273364ccfc740d6b4a31b148d582d8cfa
Author: Till Rohrmann 

  1   2   >