[jira] [Assigned] (FLINK-8300) Make JobExecutionResult accessible in per-job cluster mode

2017-12-20 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao reassigned FLINK-8300:
---

Assignee: (was: Gary Yao)
 Summary: Make JobExecutionResult accessible in per-job cluster mode  (was: 
Make JobExecutionResult accessible from JobMaster)

> Make JobExecutionResult accessible in per-job cluster mode
> --
>
> Key: FLINK-8300
> URL: https://issues.apache.org/jira/browse/FLINK-8300
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to serve the {{JobExecutionResults}} we have to cache them in the 
> {{JobMaster}} after the Job has finished. The cache should have a 
> configurable size and should periodically clean up stale entries in order to 
> avoid memory leaks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8226) Dangling reference generated after NFA clean up timed out SharedBufferEntry

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16299644#comment-16299644
 ] 

ASF GitHub Bot commented on FLINK-8226:
---

Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/5141
  
@dawidwys  Could you help to take a look at this PR? This is a bug fix and 
the issue can be easily reproduced with the test case included in the PR. 
Thanks a lot.


> Dangling reference generated after NFA clean up timed out SharedBufferEntry
> ---
>
> Key: FLINK-8226
> URL: https://issues.apache.org/jira/browse/FLINK-8226
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5141: [FLINK-8226] [cep] Dangling reference generated after NFA...

2017-12-20 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/5141
  
@dawidwys  Could you help to take a look at this PR? This is a bug fix and 
the issue can be easily reproduced with the test case included in the PR. 
Thanks a lot.


---


[GitHub] flink issue #5142: [FLINK-8227] Optimize the performance of SharedBufferSeri...

2017-12-20 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/5142
  
@dawidwys  @StephanEwen Sorry for late response. For question 1 and 2, I 
have the same thought with @dawidwys and have updated the PR accordingly. For 
question 3,  I think `int` is enough as we currently store `SharedBufferEntry` 
in a `HashMap` for each `SharedBufferPage`, and the size of  `HashMap` is 
`int`. If we want to support `long`, we should also change `HashMap` to 
something else. What's your thought?


---


[jira] [Commented] (FLINK-8227) Optimize the performance of SharedBufferSerializer

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16299640#comment-16299640
 ] 

ASF GitHub Bot commented on FLINK-8227:
---

Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/5142
  
@dawidwys  @StephanEwen Sorry for late response. For question 1 and 2, I 
have the same thought with @dawidwys and have updated the PR accordingly. For 
question 3,  I think `int` is enough as we currently store `SharedBufferEntry` 
in a `HashMap` for each `SharedBufferPage`, and the size of  `HashMap` is 
`int`. If we want to support `long`, we should also change `HashMap` to 
something else. What's your thought?


> Optimize the performance of SharedBufferSerializer
> --
>
> Key: FLINK-8227
> URL: https://issues.apache.org/jira/browse/FLINK-8227
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> Currently {{SharedBufferSerializer.serialize()}} will create a HashMap and 
> put all the {{SharedBufferEntry}} into it. Usually this is not a problem. But 
> we obverse that in some cases the calculation of hashCode may become the 
> bottleneck. The performance will decrease as the number of 
> {{SharedBufferEdge}} increases. For looping pattern {{A*}}, if the number of 
> {{SharedBufferEntry}} is {{N}}, the number of {{SharedBufferEdge}} is about 
> {{N * N}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5182: [FLINK-8162] [kinesis-connector] Emit Kinesis' mil...

2017-12-20 Thread casidiablo
Github user casidiablo commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r158194237
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -96,6 +99,15 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef,
SequenceNumber 
lastSequenceNum,
KinesisProxyInterface 
kinesis) {
this.fetcherRef = checkNotNull(fetcherRef);
+   MetricGroup kinesisMetricGroup = fetcherRef.getRuntimeContext()
--- End diff --

I think we can't register the metric in `FlinkKinesisConsumer`, since we 
need it to be associated with a particular shard id. But I could do it from the 
`KinesisDataFetcher` instead, which already has access to the runtime context.


---


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16299527#comment-16299527
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

Github user casidiablo commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r158194237
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -96,6 +99,15 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef,
SequenceNumber 
lastSequenceNum,
KinesisProxyInterface 
kinesis) {
this.fetcherRef = checkNotNull(fetcherRef);
+   MetricGroup kinesisMetricGroup = fetcherRef.getRuntimeContext()
--- End diff --

I think we can't register the metric in `FlinkKinesisConsumer`, since we 
need it to be associated with a particular shard id. But I could do it from the 
`KinesisDataFetcher` instead, which already has access to the runtime context.


> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16299514#comment-16299514
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

Github user casidiablo commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r158192923
  
--- Diff: docs/monitoring/metrics.md ---
@@ -1293,6 +1293,29 @@ Thus, in order to infer the metric identifier:
   
 
 
+ Kinesis Connectors
+
+  
+
+  Scope
+  Metrics
+  Description
+  Type
+
+  
+  
+
+  Operator
+  millisBehindLatest
+  The number of milliseconds the GetRecords response is 
from the tip of the stream,
--- End diff --

I'm OK changing it. That's actually just a copy from Amazon docs.


> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5182: [FLINK-8162] [kinesis-connector] Emit Kinesis' mil...

2017-12-20 Thread casidiablo
Github user casidiablo commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r158192923
  
--- Diff: docs/monitoring/metrics.md ---
@@ -1293,6 +1293,29 @@ Thus, in order to infer the metric identifier:
   
 
 
+ Kinesis Connectors
+
+  
+
+  Scope
+  Metrics
+  Description
+  Type
+
+  
+  
+
+  Operator
+  millisBehindLatest
+  The number of milliseconds the GetRecords response is 
from the tip of the stream,
--- End diff --

I'm OK changing it. That's actually just a copy from Amazon docs.


---


[jira] [Updated] (FLINK-8037) Missing cast in integer arithmetic in TransactionalIdsGenerator#generateIdsToAbort

2017-12-20 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-8037:
--
Description: 
{code}
  public Set generateIdsToAbort() {
Set idsToAbort = new HashSet<>();
for (int i = 0; i < safeScaleDownFactor; i++) {
  idsToAbort.addAll(generateIdsToUse(i * poolSize * totalNumberOfSubtasks));
{code}
The operands are integers where generateIdsToUse() expects long parameter.

  was:
{code}
  public Set generateIdsToAbort() {
Set idsToAbort = new HashSet<>();
for (int i = 0; i < safeScaleDownFactor; i++) {
  idsToAbort.addAll(generateIdsToUse(i * poolSize * totalNumberOfSubtasks));
{code}

The operands are integers where generateIdsToUse() expects long parameter.


> Missing cast in integer arithmetic in 
> TransactionalIdsGenerator#generateIdsToAbort
> --
>
> Key: FLINK-8037
> URL: https://issues.apache.org/jira/browse/FLINK-8037
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   public Set generateIdsToAbort() {
> Set idsToAbort = new HashSet<>();
> for (int i = 0; i < safeScaleDownFactor; i++) {
>   idsToAbort.addAll(generateIdsToUse(i * poolSize * 
> totalNumberOfSubtasks));
> {code}
> The operands are integers where generateIdsToUse() expects long parameter.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8271) upgrade from deprecated classes to AmazonKinesis

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16299275#comment-16299275
 ] 

ASF GitHub Bot commented on FLINK-8271:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5171#discussion_r158164454
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
 ---
@@ -30,37 +30,44 @@
 import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
 import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
 import com.amazonaws.auth.profile.ProfileCredentialsProvider;
-import com.amazonaws.regions.Region;
+import com.amazonaws.client.builder.AwsClientBuilder;
 import com.amazonaws.regions.Regions;
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
 
 import java.util.Properties;
 
 /**
  * Some utilities specific to Amazon Web Service.
  */
 public class AWSUtil {
+   /** Used for formatting Flink-specific user agent string when creating 
Kinesis client. */
+   private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) 
Kinesis Connector";
 
/**
-* Creates an Amazon Kinesis Client.
+* Creates an AmazonKinesis client.
 * @param configProps configuration properties containing the access 
key, secret key, and region
-* @return a new Amazon Kinesis Client
+* @return a new AmazonKinesis client
 */
-   public static AmazonKinesisClient createKinesisClient(Properties 
configProps) {
+   public static AmazonKinesis createKinesisClient(Properties configProps) 
{
// set a Flink-specific user agent
-   ClientConfiguration awsClientConfig = new 
ClientConfigurationFactory().getConfig();
-   awsClientConfig.setUserAgent("Apache Flink " + 
EnvironmentInformation.getVersion() +
-   " (" + 
EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis 
Connector");
+   ClientConfiguration awsClientConfig = new 
ClientConfigurationFactory().getConfig()
+   
.withUserAgentPrefix(String.format(USER_AGENT_FORMAT,
--- End diff --

To my understanding, switching from setUserAgent to withUserAgentPrefix 
will result in different user agent strings, right?

Though I think internally the setUserAgent method is directly forwarding to 
withUserAgentPrefix anyways ...

No objection on this, just curious.


> upgrade from deprecated classes to AmazonKinesis
> 
>
> Key: FLINK-8271
> URL: https://issues.apache.org/jira/browse/FLINK-8271
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.5.0, 1.4.1
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8271) upgrade from deprecated classes to AmazonKinesis

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16299274#comment-16299274
 ] 

ASF GitHub Bot commented on FLINK-8271:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5171#discussion_r158166090
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
 ---
@@ -30,37 +30,44 @@
 import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
 import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
 import com.amazonaws.auth.profile.ProfileCredentialsProvider;
-import com.amazonaws.regions.Region;
+import com.amazonaws.client.builder.AwsClientBuilder;
 import com.amazonaws.regions.Regions;
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
 
 import java.util.Properties;
 
 /**
  * Some utilities specific to Amazon Web Service.
  */
 public class AWSUtil {
+   /** Used for formatting Flink-specific user agent string when creating 
Kinesis client. */
+   private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) 
Kinesis Connector";
 
/**
-* Creates an Amazon Kinesis Client.
+* Creates an AmazonKinesis client.
 * @param configProps configuration properties containing the access 
key, secret key, and region
-* @return a new Amazon Kinesis Client
+* @return a new AmazonKinesis client
 */
-   public static AmazonKinesisClient createKinesisClient(Properties 
configProps) {
+   public static AmazonKinesis createKinesisClient(Properties configProps) 
{
// set a Flink-specific user agent
-   ClientConfiguration awsClientConfig = new 
ClientConfigurationFactory().getConfig();
-   awsClientConfig.setUserAgent("Apache Flink " + 
EnvironmentInformation.getVersion() +
-   " (" + 
EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis 
Connector");
+   ClientConfiguration awsClientConfig = new 
ClientConfigurationFactory().getConfig()
+   
.withUserAgentPrefix(String.format(USER_AGENT_FORMAT,
+   
EnvironmentInformation.getVersion(),
+   

EnvironmentInformation.getRevisionInformation().commitId));
 
// utilize automatic refreshment of credentials by directly 
passing the AWSCredentialsProvider
-   AmazonKinesisClient client = new AmazonKinesisClient(
-   AWSUtil.getCredentialsProvider(configProps), 
awsClientConfig);
+   AmazonKinesisClientBuilder builder = 
AmazonKinesisClientBuilder.standard()
+   
.withCredentials(AWSUtil.getCredentialsProvider(configProps))
+   .withClientConfiguration(awsClientConfig)
+   
.withRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION)));
 
-   
client.setRegion(Region.getRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION;
if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {
-   
client.setEndpoint(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT));
+   builder.withEndpointConfiguration(new 
AwsClientBuilder.EndpointConfiguration(
+   

configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT),
+   

configProps.getProperty(AWSConfigConstants.AWS_REGION)));
--- End diff --

Why does the endpoint configuration have a region now?

For example, lets say a user wants to test the connector against a local 
Kinesis mock service at "localhost:". The user also originally was issuing 
against the regular AWS Kinesis service, at region "us-west-1". The users 
properties would be like -
```
configProps.setProperty(AWSConfigConstants.AWS_REGION, "us-west-1");
configProps.setProperty(AWSConfigConstants.AWS_ENDPOINT, "localhost:");
```

In the past, this would correctly redirect requests to "localhost:".

With this change, is this also the case? Or do we actually need to call 
`new 
AwsClientBuilder.EndpointConfiguration(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT),
 null)` 

[GitHub] flink pull request #5171: [FLINK-8271][Kinesis connector] upgrade deprecated...

2017-12-20 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5171#discussion_r158166090
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
 ---
@@ -30,37 +30,44 @@
 import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
 import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
 import com.amazonaws.auth.profile.ProfileCredentialsProvider;
-import com.amazonaws.regions.Region;
+import com.amazonaws.client.builder.AwsClientBuilder;
 import com.amazonaws.regions.Regions;
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
 
 import java.util.Properties;
 
 /**
  * Some utilities specific to Amazon Web Service.
  */
 public class AWSUtil {
+   /** Used for formatting Flink-specific user agent string when creating 
Kinesis client. */
+   private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) 
Kinesis Connector";
 
/**
-* Creates an Amazon Kinesis Client.
+* Creates an AmazonKinesis client.
 * @param configProps configuration properties containing the access 
key, secret key, and region
-* @return a new Amazon Kinesis Client
+* @return a new AmazonKinesis client
 */
-   public static AmazonKinesisClient createKinesisClient(Properties 
configProps) {
+   public static AmazonKinesis createKinesisClient(Properties configProps) 
{
// set a Flink-specific user agent
-   ClientConfiguration awsClientConfig = new 
ClientConfigurationFactory().getConfig();
-   awsClientConfig.setUserAgent("Apache Flink " + 
EnvironmentInformation.getVersion() +
-   " (" + 
EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis 
Connector");
+   ClientConfiguration awsClientConfig = new 
ClientConfigurationFactory().getConfig()
+   
.withUserAgentPrefix(String.format(USER_AGENT_FORMAT,
+   
EnvironmentInformation.getVersion(),
+   

EnvironmentInformation.getRevisionInformation().commitId));
 
// utilize automatic refreshment of credentials by directly 
passing the AWSCredentialsProvider
-   AmazonKinesisClient client = new AmazonKinesisClient(
-   AWSUtil.getCredentialsProvider(configProps), 
awsClientConfig);
+   AmazonKinesisClientBuilder builder = 
AmazonKinesisClientBuilder.standard()
+   
.withCredentials(AWSUtil.getCredentialsProvider(configProps))
+   .withClientConfiguration(awsClientConfig)
+   
.withRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION)));
 
-   
client.setRegion(Region.getRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION;
if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {
-   
client.setEndpoint(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT));
+   builder.withEndpointConfiguration(new 
AwsClientBuilder.EndpointConfiguration(
+   

configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT),
+   

configProps.getProperty(AWSConfigConstants.AWS_REGION)));
--- End diff --

Why does the endpoint configuration have a region now?

For example, lets say a user wants to test the connector against a local 
Kinesis mock service at "localhost:". The user also originally was issuing 
against the regular AWS Kinesis service, at region "us-west-1". The users 
properties would be like -
```
configProps.setProperty(AWSConfigConstants.AWS_REGION, "us-west-1");
configProps.setProperty(AWSConfigConstants.AWS_ENDPOINT, "localhost:");
```

In the past, this would correctly redirect requests to "localhost:".

With this change, is this also the case? Or do we actually need to call 
`new 
AwsClientBuilder.EndpointConfiguration(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT),
 null)` instead (do not provide region in endpoint)?


---


[GitHub] flink pull request #5171: [FLINK-8271][Kinesis connector] upgrade deprecated...

2017-12-20 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5171#discussion_r158164454
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
 ---
@@ -30,37 +30,44 @@
 import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
 import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
 import com.amazonaws.auth.profile.ProfileCredentialsProvider;
-import com.amazonaws.regions.Region;
+import com.amazonaws.client.builder.AwsClientBuilder;
 import com.amazonaws.regions.Regions;
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
 
 import java.util.Properties;
 
 /**
  * Some utilities specific to Amazon Web Service.
  */
 public class AWSUtil {
+   /** Used for formatting Flink-specific user agent string when creating 
Kinesis client. */
+   private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) 
Kinesis Connector";
 
/**
-* Creates an Amazon Kinesis Client.
+* Creates an AmazonKinesis client.
 * @param configProps configuration properties containing the access 
key, secret key, and region
-* @return a new Amazon Kinesis Client
+* @return a new AmazonKinesis client
 */
-   public static AmazonKinesisClient createKinesisClient(Properties 
configProps) {
+   public static AmazonKinesis createKinesisClient(Properties configProps) 
{
// set a Flink-specific user agent
-   ClientConfiguration awsClientConfig = new 
ClientConfigurationFactory().getConfig();
-   awsClientConfig.setUserAgent("Apache Flink " + 
EnvironmentInformation.getVersion() +
-   " (" + 
EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis 
Connector");
+   ClientConfiguration awsClientConfig = new 
ClientConfigurationFactory().getConfig()
+   
.withUserAgentPrefix(String.format(USER_AGENT_FORMAT,
--- End diff --

To my understanding, switching from setUserAgent to withUserAgentPrefix 
will result in different user agent strings, right?

Though I think internally the setUserAgent method is directly forwarding to 
withUserAgentPrefix anyways ...

No objection on this, just curious.


---


[jira] [Commented] (FLINK-8271) upgrade from deprecated classes to AmazonKinesis

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16299259#comment-16299259
 ] 

ASF GitHub Bot commented on FLINK-8271:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5171#discussion_r158163793
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
 ---
@@ -30,37 +30,44 @@
 import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
 import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
 import com.amazonaws.auth.profile.ProfileCredentialsProvider;
-import com.amazonaws.regions.Region;
+import com.amazonaws.client.builder.AwsClientBuilder;
 import com.amazonaws.regions.Regions;
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
 
 import java.util.Properties;
 
 /**
  * Some utilities specific to Amazon Web Service.
  */
 public class AWSUtil {
+   /** Used for formatting Flink-specific user agent string when creating 
Kinesis client. */
+   private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) 
Kinesis Connector";
 
/**
-* Creates an Amazon Kinesis Client.
+* Creates an AmazonKinesis client.
 * @param configProps configuration properties containing the access 
key, secret key, and region
-* @return a new Amazon Kinesis Client
+* @return a new AmazonKinesis client
 */
-   public static AmazonKinesisClient createKinesisClient(Properties 
configProps) {
+   public static AmazonKinesis createKinesisClient(Properties configProps) 
{
// set a Flink-specific user agent
-   ClientConfiguration awsClientConfig = new 
ClientConfigurationFactory().getConfig();
-   awsClientConfig.setUserAgent("Apache Flink " + 
EnvironmentInformation.getVersion() +
-   " (" + 
EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis 
Connector");
+   ClientConfiguration awsClientConfig = new 
ClientConfigurationFactory().getConfig()
+   
.withUserAgentPrefix(String.format(USER_AGENT_FORMAT,
--- End diff --

To my understanding, switching from `setUserAgent` to `withUserAgentPrefix` 
will result in different user agent strings, right?

Though I think internally the `setUserAgent` method is directly forwarding 
to `withUserAgentPrefix` anyways ...

No objection on this, just curious.


> upgrade from deprecated classes to AmazonKinesis
> 
>
> Key: FLINK-8271
> URL: https://issues.apache.org/jira/browse/FLINK-8271
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.5.0, 1.4.1
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5171: [FLINK-8271][Kinesis connector] upgrade deprecated...

2017-12-20 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5171#discussion_r158163793
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
 ---
@@ -30,37 +30,44 @@
 import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
 import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
 import com.amazonaws.auth.profile.ProfileCredentialsProvider;
-import com.amazonaws.regions.Region;
+import com.amazonaws.client.builder.AwsClientBuilder;
 import com.amazonaws.regions.Regions;
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
 
 import java.util.Properties;
 
 /**
  * Some utilities specific to Amazon Web Service.
  */
 public class AWSUtil {
+   /** Used for formatting Flink-specific user agent string when creating 
Kinesis client. */
+   private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) 
Kinesis Connector";
 
/**
-* Creates an Amazon Kinesis Client.
+* Creates an AmazonKinesis client.
 * @param configProps configuration properties containing the access 
key, secret key, and region
-* @return a new Amazon Kinesis Client
+* @return a new AmazonKinesis client
 */
-   public static AmazonKinesisClient createKinesisClient(Properties 
configProps) {
+   public static AmazonKinesis createKinesisClient(Properties configProps) 
{
// set a Flink-specific user agent
-   ClientConfiguration awsClientConfig = new 
ClientConfigurationFactory().getConfig();
-   awsClientConfig.setUserAgent("Apache Flink " + 
EnvironmentInformation.getVersion() +
-   " (" + 
EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis 
Connector");
+   ClientConfiguration awsClientConfig = new 
ClientConfigurationFactory().getConfig()
+   
.withUserAgentPrefix(String.format(USER_AGENT_FORMAT,
--- End diff --

To my understanding, switching from `setUserAgent` to `withUserAgentPrefix` 
will result in different user agent strings, right?

Though I think internally the `setUserAgent` method is directly forwarding 
to `withUserAgentPrefix` anyways ...

No objection on this, just curious.


---


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16299238#comment-16299238
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r158161267
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -96,6 +99,15 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef,
SequenceNumber 
lastSequenceNum,
KinesisProxyInterface 
kinesis) {
this.fetcherRef = checkNotNull(fetcherRef);
+   MetricGroup kinesisMetricGroup = fetcherRef.getRuntimeContext()
+   .getMetricGroup()
+   .addGroup("Kinesis")
--- End diff --

Would be best if @zentol also comments on this.


> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5182: [FLINK-8162] [kinesis-connector] Emit Kinesis' mil...

2017-12-20 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r158160866
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -96,6 +99,15 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef,
SequenceNumber 
lastSequenceNum,
KinesisProxyInterface 
kinesis) {
this.fetcherRef = checkNotNull(fetcherRef);
+   MetricGroup kinesisMetricGroup = fetcherRef.getRuntimeContext()
+   .getMetricGroup()
+   .addGroup("Kinesis")
--- End diff --

Do we really need this group? The metric is already bounded to the current 
subtask, which should provide enough context that it is Kinesis-related since 
we're the Kinesis consumer, no?


---


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16299236#comment-16299236
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r158159640
  
--- Diff: docs/monitoring/metrics.md ---
@@ -1293,6 +1293,29 @@ Thus, in order to infer the metric identifier:
   
 
 
+ Kinesis Connectors
+
+  
+
+  Scope
+  Metrics
+  Description
+  Type
+
+  
+  
+
+  Operator
+  millisBehindLatest
+  The number of milliseconds the GetRecords response is 
from the tip of the stream,
--- End diff --

Just a matter of preference here: I prefer the term "head of the stream" 
instead of tip.
You can ignore this if you disagree.


> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16299240#comment-16299240
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r158161564
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -96,6 +99,15 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef,
SequenceNumber 
lastSequenceNum,
KinesisProxyInterface 
kinesis) {
this.fetcherRef = checkNotNull(fetcherRef);
+   MetricGroup kinesisMetricGroup = fetcherRef.getRuntimeContext()
--- End diff --

I feel that passing the `StreamingRuntimeContext` all the way here just to 
register metrics, is not a good idea.
Is it possible we register the metrics in `FlinkKinesisConsumer` instead? 
That also makes it more visible what metrics the consumer exposes without 
having to dig all the way to this internal `ShardConsumer` thread.


> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16299237#comment-16299237
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r158160866
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -96,6 +99,15 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef,
SequenceNumber 
lastSequenceNum,
KinesisProxyInterface 
kinesis) {
this.fetcherRef = checkNotNull(fetcherRef);
+   MetricGroup kinesisMetricGroup = fetcherRef.getRuntimeContext()
+   .getMetricGroup()
+   .addGroup("Kinesis")
--- End diff --

Do we really need this group? The metric is already bounded to the current 
subtask, which should provide enough context that it is Kinesis-related since 
we're the Kinesis consumer, no?


> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5182: [FLINK-8162] [kinesis-connector] Emit Kinesis' mil...

2017-12-20 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r158159640
  
--- Diff: docs/monitoring/metrics.md ---
@@ -1293,6 +1293,29 @@ Thus, in order to infer the metric identifier:
   
 
 
+ Kinesis Connectors
+
+  
+
+  Scope
+  Metrics
+  Description
+  Type
+
+  
+  
+
+  Operator
+  millisBehindLatest
+  The number of milliseconds the GetRecords response is 
from the tip of the stream,
--- End diff --

Just a matter of preference here: I prefer the term "head of the stream" 
instead of tip.
You can ignore this if you disagree.


---


[GitHub] flink pull request #5182: [FLINK-8162] [kinesis-connector] Emit Kinesis' mil...

2017-12-20 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r158161267
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -96,6 +99,15 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef,
SequenceNumber 
lastSequenceNum,
KinesisProxyInterface 
kinesis) {
this.fetcherRef = checkNotNull(fetcherRef);
+   MetricGroup kinesisMetricGroup = fetcherRef.getRuntimeContext()
+   .getMetricGroup()
+   .addGroup("Kinesis")
--- End diff --

Would be best if @zentol also comments on this.


---


[GitHub] flink pull request #5182: [FLINK-8162] [kinesis-connector] Emit Kinesis' mil...

2017-12-20 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r158161564
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -96,6 +99,15 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef,
SequenceNumber 
lastSequenceNum,
KinesisProxyInterface 
kinesis) {
this.fetcherRef = checkNotNull(fetcherRef);
+   MetricGroup kinesisMetricGroup = fetcherRef.getRuntimeContext()
--- End diff --

I feel that passing the `StreamingRuntimeContext` all the way here just to 
register metrics, is not a good idea.
Is it possible we register the metrics in `FlinkKinesisConsumer` instead? 
That also makes it more visible what metrics the consumer exposes without 
having to dig all the way to this internal `ShardConsumer` thread.


---


[jira] [Commented] (FLINK-7795) Utilize error-prone to discover common coding mistakes

2017-12-20 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16299208#comment-16299208
 ] 

Ted Yu commented on FLINK-7795:
---

https://github.com/google/error-prone/releases/tag/v2.1.3 was the latest 
release.

> Utilize error-prone to discover common coding mistakes
> --
>
> Key: FLINK-7795
> URL: https://issues.apache.org/jira/browse/FLINK-7795
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Ted Yu
>
> http://errorprone.info/ is a tool which detects common coding mistakes.
> We should incorporate into Flink build process.
> Here are the dependencies:
> {code}
> 
>   com.google.errorprone
>   error_prone_annotation
>   ${error-prone.version}
>   provided
> 
> 
>   
>   com.google.auto.service
>   auto-service
>   1.0-rc3
>   true
> 
> 
>   com.google.errorprone
>   error_prone_check_api
>   ${error-prone.version}
>   provided
>   
> 
>   com.google.code.findbugs
>   jsr305
> 
>   
> 
> 
>   com.google.errorprone
>   javac
>   9-dev-r4023-3
>   provided
> 
>   
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7525) Add config option to disable Cancel functionality on UI

2017-12-20 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16299205#comment-16299205
 ] 

Ted Yu commented on FLINK-7525:
---

What should be the next step ?

> Add config option to disable Cancel functionality on UI
> ---
>
> Key: FLINK-7525
> URL: https://issues.apache.org/jira/browse/FLINK-7525
> Project: Flink
>  Issue Type: Improvement
>  Components: Web Client, Webfrontend
>Reporter: Ted Yu
>
> In this email thread 
> http://search-hadoop.com/m/Flink/VkLeQlf0QOnc7YA?subj=Security+Control+of+running+Flink+Jobs+on+Flink+UI
>  , Raja was asking for a way to control how users cancel Job(s).
> Robert proposed adding a config option which disables the Cancel 
> functionality.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5191: Release 1.4

2017-12-20 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5191
  
This seems like a mistakenly opened PR.
Can you please close this, @czhxmz? Thanks!


---


[GitHub] flink issue #5187: Merge pull request #1 from apache/master

2017-12-20 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5187
  
This seems like a mistake. @laolang113 can you please close this PR? Thanks!


---


[GitHub] flink issue #5195: [hotfix] [build] Always include Kafka 0.11 connector

2017-12-20 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5195
  
+1, LGTM


---


[jira] [Commented] (FLINK-8283) FlinkKafkaConsumerBase failing on Travis with no output in 10min

2017-12-20 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16299058#comment-16299058
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8283:


I'm suspecting that the Travis infra updates on Dec. 12th is somehow causing 
this: https://blog.travis-ci.com/2017-12-12-new-trusty-images-q4-launch.

It seems like the 10min no-output started since our commits after Dec. 12th (as 
far as I can tell from our build history).

> FlinkKafkaConsumerBase failing on Travis with no output in 10min
> 
>
> Key: FLINK-8283
> URL: https://issues.apache.org/jira/browse/FLINK-8283
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>  Labels: test-stability
>
> Since a few days, Travis builds with the {{connectors}} profile keep failing 
> more often with no new output being received within 10 minutes. It seems to 
> start with the Travis build for 
> https://github.com/apache/flink/commit/840cbfbf0845b60dbf02dd2f37f696f1db21b1e9
>  but may have been introduced earlier. The printed offsets look strange 
> though.
> {code}
> 16:33:12,508 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Setting 
> restore state in the FlinkKafkaConsumer: 
> {KafkaTopicPartition{topic='test-topic', partition=0}=-915623761773, 
> KafkaTopicPartition{topic='test-topic', partition=1}=-915623761773}
> 16:33:12,520 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - 
> Consumer subtask 2 will start reading 66 partitions with offsets in restored 
> state: {KafkaTopicPartition{topic='test-topic', partition=851}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=716}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=461}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=206}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=971}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=836}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=581}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=326}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=71}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=956}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=701}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=446}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=191}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=56}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=821}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=566}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=311}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=881}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=626}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=371}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=236}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=746}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=491}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=356}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=101}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=866}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=611}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=476}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=221}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=986}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=731}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=596}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=341}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=86}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=656}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=401}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=146}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=911}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=776}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=521}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=266}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=11}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', 

[jira] [Commented] (FLINK-8297) RocksDBListState stores whole list in single byte[]

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298961#comment-16298961
 ] 

ASF GitHub Bot commented on FLINK-8297:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5185
  
This might be a great chance to start discussing how ListState APIs should 
evolve - specifically, is it time to consider adding `remove()` to ListState?


> RocksDBListState stores whole list in single byte[]
> ---
>
> Key: FLINK-8297
> URL: https://issues.apache.org/jira/browse/FLINK-8297
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Jan Lukavský
>
> RocksDBListState currently keeps whole list of data in single RocksDB 
> key-value pair, which implies that the list actually must fit into memory. 
> Larger lists are not supported and end up with OOME or other error. The 
> RocksDBListState could be modified so that individual items in list are 
> stored in separate keys in RocksDB and can then be iterated over. A simple 
> implementation could reuse existing RocksDBMapState, with key as index to the 
> list and a single RocksDBValueState keeping track of how many items has 
> already been added to the list. Because this implementation might be less 
> efficient in come cases, it would be good to make it opt-in by a construct 
> like
> {{new RocksDBStateBackend().enableLargeListsPerKey()}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5185: [FLINK-8297] [flink-rocksdb] optionally use RocksDBMapSta...

2017-12-20 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5185
  
This might be a great chance to start discussing how ListState APIs should 
evolve - specifically, is it time to consider adding `remove()` to ListState?


---


[GitHub] flink pull request #5195: [hotfix] [build] Always include Kafka 0.11 connect...

2017-12-20 Thread greghogan
GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/5195

[hotfix] [build] Always include Kafka 0.11 connector

Now that Flink only supports builds for Scala 2.11+ we can unconditionally 
enable the Kafka 0.11 connector.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
20171220a_always_include_kafka_0.11_connector

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5195.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5195


commit c4bdbbcb95344101a61c6864d7f1c4feca1f0cb3
Author: Greg Hogan 
Date:   2017-12-20T17:11:00Z

[hotfix] [build] Always include Kafka 0.11 connector

Now that Flink only supports builds for Scala 2.11+ we can
unconditionally enable the Kafka 0.11 connector.




---


[jira] [Commented] (FLINK-8271) upgrade from deprecated classes to AmazonKinesis

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298835#comment-16298835
 ] 

ASF GitHub Bot commented on FLINK-8271:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5171
  
You are right. Thank you, Gordon. I updated the PR title


> upgrade from deprecated classes to AmazonKinesis
> 
>
> Key: FLINK-8271
> URL: https://issues.apache.org/jira/browse/FLINK-8271
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.5.0, 1.4.1
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5171: [FLINK-8271][Kinesis connector] upgrade deprecated method...

2017-12-20 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5171
  
You are right. Thank you, Gordon. I updated the PR title


---


[jira] [Comment Edited] (FLINK-7588) Document RocksDB tuning for spinning disks

2017-12-20 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258309#comment-16258309
 ] 

Ted Yu edited comment on FLINK-7588 at 12/20/17 6:02 PM:
-

bq. Be careful about whether you have enough memory to keep all bloom filters

Other than the above being tricky, the other guidelines are actionable .


was (Author: yuzhih...@gmail.com):
bq. Be careful about whether you have enough memory to keep all bloom filters

Other than the above being tricky, the other guidelines are actionable.

> Document RocksDB tuning for spinning disks
> --
>
> Key: FLINK-7588
> URL: https://issues.apache.org/jira/browse/FLINK-7588
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Ted Yu
>
> In docs/ops/state/large_state_tuning.md , it was mentioned that:
> bq. the default configuration is tailored towards SSDs and performs 
> suboptimal on spinning disks
> We should add recommendation targeting spinning disks:
> https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#difference-of-spinning-disk



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7897) Consider using nio.Files for file deletion in TransientBlobCleanupTask

2017-12-20 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-7897:
--
Description: 
nio.Files#delete() provides better clue as to why the deletion may fail:

https://docs.oracle.com/javase/7/docs/api/java/nio/file/Files.html#delete(java.nio.file.Path)

Depending on the potential exception (FileNotFound), the call to 
localFile.exists() may be skipped.

  was:
nio.Files#delete() provides better clue as to why the deletion may fail:

https://docs.oracle.com/javase/7/docs/api/java/nio/file/Files.html#delete(java.nio.file.Path)


Depending on the potential exception, the call to localFile.exists() may be 
skipped.


> Consider using nio.Files for file deletion in TransientBlobCleanupTask
> --
>
> Key: FLINK-7897
> URL: https://issues.apache.org/jira/browse/FLINK-7897
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Ted Yu
>Priority: Minor
>
> nio.Files#delete() provides better clue as to why the deletion may fail:
> https://docs.oracle.com/javase/7/docs/api/java/nio/file/Files.html#delete(java.nio.file.Path)
> Depending on the potential exception (FileNotFound), the call to 
> localFile.exists() may be skipped.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-8295) Netty shading does not work properly

2017-12-20 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-8295.
-
   Resolution: Fixed
Fix Version/s: 1.4.1
   1.5.0

Fixed in 1.5: 1a98e327ea504f1422935c12a3342997145b9292
Fixed in 1.4: 7e497f744a67c8011a8e1f353eddc4f1d514

> Netty shading does not work properly
> 
>
> Key: FLINK-8295
> URL: https://issues.apache.org/jira/browse/FLINK-8295
> Project: Flink
>  Issue Type: Bug
>  Components: Cassandra Connector, Core
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>Assignee: Nico Kruber
> Fix For: 1.5.0, 1.4.1
>
>
> Multiple users complained that the Cassandra connector is not usable in Flink 
> 1.4.0 due to wrong/insufficient shading of Netty.
> See:
> http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3Cb1f584b918c8aaf98b744c168407b0f5%40dbruhn.de%3E
> http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3CCACk7FTgMPR03bPBoKzmeVKCqS%2BumTR1u1X%2BKdPtHRgbnUZiO3A%40mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8295) Netty shading does not work properly

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298771#comment-16298771
 ] 

ASF GitHub Bot commented on FLINK-8295:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5183


> Netty shading does not work properly
> 
>
> Key: FLINK-8295
> URL: https://issues.apache.org/jira/browse/FLINK-8295
> Project: Flink
>  Issue Type: Bug
>  Components: Cassandra Connector, Core
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>Assignee: Nico Kruber
>
> Multiple users complained that the Cassandra connector is not usable in Flink 
> 1.4.0 due to wrong/insufficient shading of Netty.
> See:
> http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3Cb1f584b918c8aaf98b744c168407b0f5%40dbruhn.de%3E
> http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3CCACk7FTgMPR03bPBoKzmeVKCqS%2BumTR1u1X%2BKdPtHRgbnUZiO3A%40mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5183: [FLINK-8295][cassandra][build] properly shade nett...

2017-12-20 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5183


---


[jira] [Commented] (FLINK-8295) Netty shading does not work properly

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298765#comment-16298765
 ] 

ASF GitHub Bot commented on FLINK-8295:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5183
  
I tested it locally. Will merge this now...


> Netty shading does not work properly
> 
>
> Key: FLINK-8295
> URL: https://issues.apache.org/jira/browse/FLINK-8295
> Project: Flink
>  Issue Type: Bug
>  Components: Cassandra Connector, Core
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>Assignee: Nico Kruber
>
> Multiple users complained that the Cassandra connector is not usable in Flink 
> 1.4.0 due to wrong/insufficient shading of Netty.
> See:
> http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3Cb1f584b918c8aaf98b744c168407b0f5%40dbruhn.de%3E
> http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3CCACk7FTgMPR03bPBoKzmeVKCqS%2BumTR1u1X%2BKdPtHRgbnUZiO3A%40mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5183: [FLINK-8295][cassandra][build] properly shade netty for t...

2017-12-20 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5183
  
I tested it locally. Will merge this now...


---


[jira] [Commented] (FLINK-8233) Expose JobExecutionResult via HTTP

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298752#comment-16298752
 ] 

ASF GitHub Bot commented on FLINK-8233:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5194#discussion_r158081176
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobExecutionResultDeserializer.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.JobExecutionResult;
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.SerializedValue;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonToken;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.type.TypeFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * JSON deserializer for {@link JobExecutionResult}.
+ *
+ * @see JobExecutionResultSerializer
+ */
+public class JobExecutionResultDeserializer extends 
StdDeserializer {
+
+   private static final long serialVersionUID = 1L;
+
+   private final JobIDDeserializer jobIdDeserializer = new 
JobIDDeserializer();
+
+   private final SerializedThrowableDeserializer 
serializedThrowableDeserializer =
+   new SerializedThrowableDeserializer();
+
+   private final SerializedValueDeserializer serializedValueDeserializer;
+
+   public JobExecutionResultDeserializer() {
+   super(JobExecutionResult.class);
+   final JavaType objectSerializedValueType = 
TypeFactory.defaultInstance()
+   .constructType(new 
TypeReference() {
+   });
+   serializedValueDeserializer = new 
SerializedValueDeserializer(objectSerializedValueType);
+   }
+
+   @Override
+   public JobExecutionResult deserialize(final JsonParser p, final 
DeserializationContext ctxt) throws IOException {
+   JobID jobId = null;
+   long netRuntime = -1;
+   SerializedThrowable serializedThrowable = null;
+   Map accumulatorResults = null;
+
+   while (true) {
+   final JsonToken jsonToken = p.nextToken();
+   assertNotEndOfInput(p, jsonToken);
+   if (jsonToken == JsonToken.END_OBJECT) {
+   break;
+   }
+
+   final String fieldName = p.getValueAsString();
+   switch (fieldName) {
+   case 
JobExecutionResultSerializer.FIELD_NAME_JOB_ID:
+   assertNextToken(p, 
JsonToken.VALUE_STRING);
+   jobId = 
jobIdDeserializer.deserialize(p, ctxt);
+   break;
+   case 
JobExecutionResultSerializer.FIELD_NAME_NET_RUNTIME:
+   assertNextToken(p, 
JsonToken.VALUE_NUMBER_INT);
+   netRuntime = p.getLongValue();
+   break;
+   

[GitHub] flink pull request #5194: [FLINK-8233][flip6] Add JobExecutionResultHandler

2017-12-20 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5194#discussion_r158081176
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobExecutionResultDeserializer.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.JobExecutionResult;
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.SerializedValue;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonToken;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.type.TypeFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * JSON deserializer for {@link JobExecutionResult}.
+ *
+ * @see JobExecutionResultSerializer
+ */
+public class JobExecutionResultDeserializer extends 
StdDeserializer {
+
+   private static final long serialVersionUID = 1L;
+
+   private final JobIDDeserializer jobIdDeserializer = new 
JobIDDeserializer();
+
+   private final SerializedThrowableDeserializer 
serializedThrowableDeserializer =
+   new SerializedThrowableDeserializer();
+
+   private final SerializedValueDeserializer serializedValueDeserializer;
+
+   public JobExecutionResultDeserializer() {
+   super(JobExecutionResult.class);
+   final JavaType objectSerializedValueType = 
TypeFactory.defaultInstance()
+   .constructType(new 
TypeReference() {
+   });
+   serializedValueDeserializer = new 
SerializedValueDeserializer(objectSerializedValueType);
+   }
+
+   @Override
+   public JobExecutionResult deserialize(final JsonParser p, final 
DeserializationContext ctxt) throws IOException {
+   JobID jobId = null;
+   long netRuntime = -1;
+   SerializedThrowable serializedThrowable = null;
+   Map accumulatorResults = null;
+
+   while (true) {
+   final JsonToken jsonToken = p.nextToken();
+   assertNotEndOfInput(p, jsonToken);
+   if (jsonToken == JsonToken.END_OBJECT) {
+   break;
+   }
+
+   final String fieldName = p.getValueAsString();
+   switch (fieldName) {
+   case 
JobExecutionResultSerializer.FIELD_NAME_JOB_ID:
+   assertNextToken(p, 
JsonToken.VALUE_STRING);
+   jobId = 
jobIdDeserializer.deserialize(p, ctxt);
+   break;
+   case 
JobExecutionResultSerializer.FIELD_NAME_NET_RUNTIME:
+   assertNextToken(p, 
JsonToken.VALUE_NUMBER_INT);
+   netRuntime = p.getLongValue();
+   break;
+   case 
JobExecutionResultSerializer.FIELD_NAME_ACCUMULATOR_RESULTS:
+   assertNextToken(p, 
JsonToken.START_OBJECT);
+   accumulatorResults = 
parseAccumulatorResults(p, 

[GitHub] flink pull request #5194: [FLINK-8233][flip6] Add JobExecutionResultHandler

2017-12-20 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5194#discussion_r158078273
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobExecutionResultDeserializer.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.JobExecutionResult;
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.SerializedValue;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonToken;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.type.TypeFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * JSON deserializer for {@link JobExecutionResult}.
+ *
+ * @see JobExecutionResultSerializer
+ */
+public class JobExecutionResultDeserializer extends 
StdDeserializer {
+
+   private static final long serialVersionUID = 1L;
+
+   private final JobIDDeserializer jobIdDeserializer = new 
JobIDDeserializer();
+
+   private final SerializedThrowableDeserializer 
serializedThrowableDeserializer =
+   new SerializedThrowableDeserializer();
+
+   private final SerializedValueDeserializer serializedValueDeserializer;
+
+   public JobExecutionResultDeserializer() {
+   super(JobExecutionResult.class);
+   final JavaType objectSerializedValueType = 
TypeFactory.defaultInstance()
+   .constructType(new 
TypeReference() {
+   });
+   serializedValueDeserializer = new 
SerializedValueDeserializer(objectSerializedValueType);
+   }
+
+   @Override
+   public JobExecutionResult deserialize(final JsonParser p, final 
DeserializationContext ctxt) throws IOException {
+   JobID jobId = null;
+   long netRuntime = -1;
+   SerializedThrowable serializedThrowable = null;
+   Map accumulatorResults = null;
+
+   while (true) {
+   final JsonToken jsonToken = p.nextToken();
+   assertNotEndOfInput(p, jsonToken);
+   if (jsonToken == JsonToken.END_OBJECT) {
+   break;
+   }
+
+   final String fieldName = p.getValueAsString();
+   switch (fieldName) {
+   case 
JobExecutionResultSerializer.FIELD_NAME_JOB_ID:
+   assertNextToken(p, 
JsonToken.VALUE_STRING);
+   jobId = 
jobIdDeserializer.deserialize(p, ctxt);
+   break;
+   case 
JobExecutionResultSerializer.FIELD_NAME_NET_RUNTIME:
+   assertNextToken(p, 
JsonToken.VALUE_NUMBER_INT);
+   netRuntime = p.getLongValue();
+   break;
+   case 
JobExecutionResultSerializer.FIELD_NAME_ACCUMULATOR_RESULTS:
+   assertNextToken(p, 
JsonToken.START_OBJECT);
+   accumulatorResults = 
parseAccumulatorResults(p, 

[jira] [Commented] (FLINK-8233) Expose JobExecutionResult via HTTP

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298732#comment-16298732
 ] 

ASF GitHub Bot commented on FLINK-8233:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5194#discussion_r158078273
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobExecutionResultDeserializer.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.JobExecutionResult;
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.SerializedValue;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonToken;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.type.TypeFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * JSON deserializer for {@link JobExecutionResult}.
+ *
+ * @see JobExecutionResultSerializer
+ */
+public class JobExecutionResultDeserializer extends 
StdDeserializer {
+
+   private static final long serialVersionUID = 1L;
+
+   private final JobIDDeserializer jobIdDeserializer = new 
JobIDDeserializer();
+
+   private final SerializedThrowableDeserializer 
serializedThrowableDeserializer =
+   new SerializedThrowableDeserializer();
+
+   private final SerializedValueDeserializer serializedValueDeserializer;
+
+   public JobExecutionResultDeserializer() {
+   super(JobExecutionResult.class);
+   final JavaType objectSerializedValueType = 
TypeFactory.defaultInstance()
+   .constructType(new 
TypeReference() {
+   });
+   serializedValueDeserializer = new 
SerializedValueDeserializer(objectSerializedValueType);
+   }
+
+   @Override
+   public JobExecutionResult deserialize(final JsonParser p, final 
DeserializationContext ctxt) throws IOException {
+   JobID jobId = null;
+   long netRuntime = -1;
+   SerializedThrowable serializedThrowable = null;
+   Map accumulatorResults = null;
+
+   while (true) {
+   final JsonToken jsonToken = p.nextToken();
+   assertNotEndOfInput(p, jsonToken);
+   if (jsonToken == JsonToken.END_OBJECT) {
+   break;
+   }
+
+   final String fieldName = p.getValueAsString();
+   switch (fieldName) {
+   case 
JobExecutionResultSerializer.FIELD_NAME_JOB_ID:
+   assertNextToken(p, 
JsonToken.VALUE_STRING);
+   jobId = 
jobIdDeserializer.deserialize(p, ctxt);
+   break;
+   case 
JobExecutionResultSerializer.FIELD_NAME_NET_RUNTIME:
+   assertNextToken(p, 
JsonToken.VALUE_NUMBER_INT);
+   netRuntime = p.getLongValue();
+   break;
+   

[jira] [Commented] (FLINK-8281) org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream cannot be cast to org.apache.flink.core.fs.WrappingProxyCloseable

2017-12-20 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298723#comment-16298723
 ] 

Timo Walther commented on FLINK-8281:
-

Sorry, I haven't seen that this issue is already resolved.

> org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream cannot be cast to 
> org.apache.flink.core.fs.WrappingProxyCloseable
> -
>
> Key: FLINK-8281
> URL: https://issues.apache.org/jira/browse/FLINK-8281
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.4.0
> Environment: Linux 4.4.0-104-generic #127-Ubuntu SMP Mon Dec 11 
> 12:16:42 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
>Reporter: brucewoo
>Priority: Critical
> Fix For: 1.4.0
>
>
> {noformat}
> org.apache.flink.streaming.runtime.tasks.AsynchronousException: 
> java.lang.Exception: Could not materialize checkpoint 1 for operator window: 
> (TumblingGroupWindow('w$, 'RowTime, 6.millis)), select: (COUNT(*) AS 
> api_call_count, SUM(bytes) AS total_bytes, SUM(numbers) AS total_numbers, 
> start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, 
> proctime('w$) AS w$proctime) -> select: (CAST(w$end) AS proc_end_time, 
> api_call_count, total_bytes, total_numbers) -> to: Row (1/1).
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:945)
>  ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> ~[na:1.8.0_151]
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
> ~[na:1.8.0_151]
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  ~[na:1.8.0_151]
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  ~[na:1.8.0_151]
>   at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_151]
> Caused by: java.lang.Exception: Could not materialize checkpoint 1 for 
> operator window: (TumblingGroupWindow('w$, 'RowTime, 6.millis)), select: 
> (COUNT(*) AS api_call_count, SUM(bytes) AS total_bytes, SUM(numbers) AS 
> total_numbers, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS 
> w$rowtime, proctime('w$) AS w$proctime) -> select: (CAST(w$end) AS 
> proc_end_time, api_call_count, total_bytes, total_numbers) -> to: Row (1/1).
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:946)
>  ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   ... 5 common frames omitted
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not open output stream for state backend
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122) 
> ~[na:1.8.0_151]
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192) 
> ~[na:1.8.0_151]
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) 
> ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>  ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   ... 5 common frames omitted
>   Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:92)
>  ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>  ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
>  ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   ... 5 common frames omitted
>   Caused by: java.util.concurrent.ExecutionException: 
> java.io.IOException: Could not open output stream for state backend
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
>   ... 7 common frames omitted
>   Caused by: java.io.IOException: Could not open output stream for state 
> backend
>   at 
> 

[jira] [Updated] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners

2017-12-20 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-8234:

Description: In order to serve the {{JobExecutionResults}} we have to cache 
them in the {{Dispatcher}} after the {{JobManagerRunner}} has finished. The 
cache should have a configurable size and should periodically clean up stale 
entries in order to avoid memory exhaustion.  (was: In order to serve the 
{{JobExecutionResults}} we have to cache them in the {{Dispatcher}} after the 
{{JobManagerRunner}} has finished. The cache should have a configurable size 
and should periodically clean up stale entries in order to avoid memory leaks.)

> Cache JobExecutionResult from finished JobManagerRunners
> 
>
> Key: FLINK-8234
> URL: https://issues.apache.org/jira/browse/FLINK-8234
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to serve the {{JobExecutionResults}} we have to cache them in the 
> {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should 
> have a configurable size and should periodically clean up stale entries in 
> order to avoid memory exhaustion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8281) org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream cannot be cast to org.apache.flink.core.fs.WrappingProxyCloseable

2017-12-20 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298716#comment-16298716
 ] 

Timo Walther commented on FLINK-8281:
-

[~brucewoo] can you tell us a bit more about your environment? are you using 
Flink with bundled Hadoop? How do you dependecies look like? Can you post your 
project's pom.xml?

> org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream cannot be cast to 
> org.apache.flink.core.fs.WrappingProxyCloseable
> -
>
> Key: FLINK-8281
> URL: https://issues.apache.org/jira/browse/FLINK-8281
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.4.0
> Environment: Linux 4.4.0-104-generic #127-Ubuntu SMP Mon Dec 11 
> 12:16:42 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
>Reporter: brucewoo
>Priority: Critical
> Fix For: 1.4.0
>
>
> {noformat}
> org.apache.flink.streaming.runtime.tasks.AsynchronousException: 
> java.lang.Exception: Could not materialize checkpoint 1 for operator window: 
> (TumblingGroupWindow('w$, 'RowTime, 6.millis)), select: (COUNT(*) AS 
> api_call_count, SUM(bytes) AS total_bytes, SUM(numbers) AS total_numbers, 
> start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, 
> proctime('w$) AS w$proctime) -> select: (CAST(w$end) AS proc_end_time, 
> api_call_count, total_bytes, total_numbers) -> to: Row (1/1).
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:945)
>  ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> ~[na:1.8.0_151]
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
> ~[na:1.8.0_151]
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  ~[na:1.8.0_151]
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  ~[na:1.8.0_151]
>   at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_151]
> Caused by: java.lang.Exception: Could not materialize checkpoint 1 for 
> operator window: (TumblingGroupWindow('w$, 'RowTime, 6.millis)), select: 
> (COUNT(*) AS api_call_count, SUM(bytes) AS total_bytes, SUM(numbers) AS 
> total_numbers, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS 
> w$rowtime, proctime('w$) AS w$proctime) -> select: (CAST(w$end) AS 
> proc_end_time, api_call_count, total_bytes, total_numbers) -> to: Row (1/1).
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:946)
>  ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   ... 5 common frames omitted
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not open output stream for state backend
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122) 
> ~[na:1.8.0_151]
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192) 
> ~[na:1.8.0_151]
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) 
> ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>  ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   ... 5 common frames omitted
>   Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:92)
>  ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>  ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
>  ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na]
>   ... 5 common frames omitted
>   Caused by: java.util.concurrent.ExecutionException: 
> java.io.IOException: Could not open output stream for state backend
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
>   ... 7 common frames 

[jira] [Updated] (FLINK-8300) Make JobExecutionResult accessible from JobMaster

2017-12-20 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-8300:

Description: In order to serve the {{JobExecutionResults}} we have to cache 
them in the {{JobMaster}} after the Job has finished. The cache should have a 
configurable size and should periodically clean up stale entries in order to 
avoid memory leaks.  (was: In order to serve the {{JobExecutionResults}} we 
have to cache them in the {{Dispatcher}} after the {{JobManagerRunner}} has 
finished. The cache should have a configurable size and should periodically 
clean up stale entries in order to avoid memory leaks.)

> Make JobExecutionResult accessible from JobMaster
> -
>
> Key: FLINK-8300
> URL: https://issues.apache.org/jira/browse/FLINK-8300
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to serve the {{JobExecutionResults}} we have to cache them in the 
> {{JobMaster}} after the Job has finished. The cache should have a 
> configurable size and should periodically clean up stale entries in order to 
> avoid memory leaks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8300) Make JobExecutionResult accessible from JobMaster

2017-12-20 Thread Gary Yao (JIRA)
Gary Yao created FLINK-8300:
---

 Summary: Make JobExecutionResult accessible from JobMaster
 Key: FLINK-8300
 URL: https://issues.apache.org/jira/browse/FLINK-8300
 Project: Flink
  Issue Type: Sub-task
  Components: Distributed Coordination
Affects Versions: 1.5.0
Reporter: Gary Yao
Assignee: Gary Yao
Priority: Blocker
 Fix For: 1.5.0


In order to serve the {{JobExecutionResults}} we have to cache them in the 
{{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should 
have a configurable size and should periodically clean up stale entries in 
order to avoid memory leaks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8292) Remove unnecessary force cast in DataStreamSource

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298658#comment-16298658
 ] 

ASF GitHub Bot commented on FLINK-8292:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/5180
  
+0


> Remove unnecessary force cast in DataStreamSource
> -
>
> Key: FLINK-8292
> URL: https://issues.apache.org/jira/browse/FLINK-8292
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Matrix42
>Priority: Trivial
> Fix For: 1.5.0, 1.4.1
>
>
> In DataStreamSource there is a cast can be replaced by retuen `this`



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5180: [FLINK-8292] Remove unnecessary force cast in DataStreamS...

2017-12-20 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/5180
  
+0


---


[jira] [Updated] (FLINK-8233) Expose JobExecutionResult via HTTP

2017-12-20 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-8233:

Description: 
Expose {{JobExecutionResult}} from a finished Flink job via HTTP:
* Add a new AbstractRestHandler that returns the information in 
{{JobExecutionResult}}.
* Register new handler in {{WebMonitorEndpoint}}.

  was:Retrieve the {{ExecutionResult}} from a finished Flink job via the 
{{RestClusterClient}}.


> Expose JobExecutionResult via HTTP
> --
>
> Key: FLINK-8233
> URL: https://issues.apache.org/jira/browse/FLINK-8233
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Expose {{JobExecutionResult}} from a finished Flink job via HTTP:
> * Add a new AbstractRestHandler that returns the information in 
> {{JobExecutionResult}}.
> * Register new handler in {{WebMonitorEndpoint}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8233) Expose JobExecutionResult via HTTP

2017-12-20 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-8233:

Summary: Expose JobExecutionResult via HTTP  (was: Retrieve ExecutionResult 
by REST polling)

> Expose JobExecutionResult via HTTP
> --
>
> Key: FLINK-8233
> URL: https://issues.apache.org/jira/browse/FLINK-8233
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Retrieve the {{ExecutionResult}} from a finished Flink job via the 
> {{RestClusterClient}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8299) Retrieve ExecutionResult by REST polling

2017-12-20 Thread Gary Yao (JIRA)
Gary Yao created FLINK-8299:
---

 Summary: Retrieve ExecutionResult by REST polling
 Key: FLINK-8299
 URL: https://issues.apache.org/jira/browse/FLINK-8299
 Project: Flink
  Issue Type: Sub-task
  Components: REST
Affects Versions: 1.5.0
Reporter: Gary Yao
Assignee: Gary Yao
Priority: Blocker
 Fix For: 1.5.0


Retrieve the {{ExecutionResult}} from a finished Flink job via the 
{{RestClusterClient}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8280) Enable checkstyle for org.apache.flink.runtime.blob

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298650#comment-16298650
 ] 

ASF GitHub Bot commented on FLINK-8280:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5175#discussion_r158056059
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java ---
@@ -150,8 +150,7 @@ static File initLocalStorageDirectory(String basePath) 
throws IOException {
File storageDir;
 
// NOTE: although we will be using UUIDs, there may be 
collisions
-   final int MAX_ATTEMPTS = 10;
-   for(int attempt = 0; attempt < MAX_ATTEMPTS; attempt++) {
+   for (int attempt = 0; attempt < 10; attempt++) {
--- End diff --

Should we keep and simply rename the constant?


> Enable checkstyle for org.apache.flink.runtime.blob
> ---
>
> Key: FLINK-8280
> URL: https://issues.apache.org/jira/browse/FLINK-8280
> Project: Flink
>  Issue Type: Improvement
>  Components: Checkstyle
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5175: [FLINK-8280][checkstyle] fix checkstyle in BlobSer...

2017-12-20 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5175#discussion_r158056059
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java ---
@@ -150,8 +150,7 @@ static File initLocalStorageDirectory(String basePath) 
throws IOException {
File storageDir;
 
// NOTE: although we will be using UUIDs, there may be 
collisions
-   final int MAX_ATTEMPTS = 10;
-   for(int attempt = 0; attempt < MAX_ATTEMPTS; attempt++) {
+   for (int attempt = 0; attempt < 10; attempt++) {
--- End diff --

Should we keep and simply rename the constant?


---


[jira] [Created] (FLINK-8298) Shutdown MockEnvironment

2017-12-20 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-8298:
-

 Summary: Shutdown MockEnvironment
 Key: FLINK-8298
 URL: https://issues.apache.org/jira/browse/FLINK-8298
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.4.0
Reporter: Piotr Nowojski
Assignee: Piotr Nowojski
 Fix For: 1.5.0


IOManager inside MockEnvironment is not being shutdown properly in tests 
causing a memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8079) Skip remaining E2E tests if one failed

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298598#comment-16298598
 ] 

ASF GitHub Bot commented on FLINK-8079:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5156#discussion_r158037581
  
--- Diff: tools/travis_mvn_watchdog.sh ---
@@ -543,35 +543,45 @@ case $TEST in
printf "Running end-to-end tests\n"
printf 
"==\n"
 
-   printf 
"\n==\n"
-   printf "Running Wordcount end-to-end test\n"
-   printf 
"==\n"
-   FLINK_DIR=build-target CLUSTER_MODE=cluster 
test-infra/end-to-end-test/test_batch_wordcount.sh
-   EXIT_CODE=$(($EXIT_CODE+$?))
-
-   printf 
"\n==\n"
-   printf "Running Kafka end-to-end test\n"
-   printf 
"==\n"
-   FLINK_DIR=build-target CLUSTER_MODE=cluster 
test-infra/end-to-end-test/test_streaming_kafka010.sh
-   EXIT_CODE=$(($EXIT_CODE+$?))
-
-   printf 
"\n==\n"
-   printf "Running class loading end-to-end test\n"
-   printf 
"==\n"
-   FLINK_DIR=build-target CLUSTER_MODE=cluster 
test-infra/end-to-end-test/test_streaming_classloader.sh
-   EXIT_CODE=$(($EXIT_CODE+$?))
-
-   printf 
"\n==\n"
-   printf "Running Shaded Hadoop S3A end-to-end test\n"
-   printf 
"==\n"
-   FLINK_DIR=build-target CLUSTER_MODE=cluster 
test-infra/end-to-end-test/test_shaded_hadoop_s3a.sh
-   EXIT_CODE=$(($EXIT_CODE+$?))
-
-   printf 
"\n==\n"
-   printf "Running Shaded Presto S3 end-to-end test\n"
-   printf 
"==\n"
-   FLINK_DIR=build-target CLUSTER_MODE=cluster 
test-infra/end-to-end-test/test_shaded_presto_s3.sh
-   EXIT_CODE=$(($EXIT_CODE+$?))
+   if [ $EXIT_CODE == 0 ]; then
+   printf 
"\n==\n"
+   printf "Running Wordcount end-to-end test\n"
+   printf 
"==\n"
+   FLINK_DIR=build-target CLUSTER_MODE=cluster 
test-infra/end-to-end-test/test_batch_wordcount.sh
+   EXIT_CODE=$(($EXIT_CODE+$?))
--- End diff --

If `$EXIT_CODE == 0` then why not simply set `EXIT_CODE=$?`?


> Skip remaining E2E tests if one failed
> --
>
> Key: FLINK-8079
> URL: https://issues.apache.org/jira/browse/FLINK-8079
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.5.0, 1.4.1
>
>
> I propose that if one end-to-end tests fails the remaining tests are skipped.
> [~aljoscha] What do you think?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5156: [FLINK-8079][tests] Stop end-to-end test execution...

2017-12-20 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5156#discussion_r158037581
  
--- Diff: tools/travis_mvn_watchdog.sh ---
@@ -543,35 +543,45 @@ case $TEST in
printf "Running end-to-end tests\n"
printf 
"==\n"
 
-   printf 
"\n==\n"
-   printf "Running Wordcount end-to-end test\n"
-   printf 
"==\n"
-   FLINK_DIR=build-target CLUSTER_MODE=cluster 
test-infra/end-to-end-test/test_batch_wordcount.sh
-   EXIT_CODE=$(($EXIT_CODE+$?))
-
-   printf 
"\n==\n"
-   printf "Running Kafka end-to-end test\n"
-   printf 
"==\n"
-   FLINK_DIR=build-target CLUSTER_MODE=cluster 
test-infra/end-to-end-test/test_streaming_kafka010.sh
-   EXIT_CODE=$(($EXIT_CODE+$?))
-
-   printf 
"\n==\n"
-   printf "Running class loading end-to-end test\n"
-   printf 
"==\n"
-   FLINK_DIR=build-target CLUSTER_MODE=cluster 
test-infra/end-to-end-test/test_streaming_classloader.sh
-   EXIT_CODE=$(($EXIT_CODE+$?))
-
-   printf 
"\n==\n"
-   printf "Running Shaded Hadoop S3A end-to-end test\n"
-   printf 
"==\n"
-   FLINK_DIR=build-target CLUSTER_MODE=cluster 
test-infra/end-to-end-test/test_shaded_hadoop_s3a.sh
-   EXIT_CODE=$(($EXIT_CODE+$?))
-
-   printf 
"\n==\n"
-   printf "Running Shaded Presto S3 end-to-end test\n"
-   printf 
"==\n"
-   FLINK_DIR=build-target CLUSTER_MODE=cluster 
test-infra/end-to-end-test/test_shaded_presto_s3.sh
-   EXIT_CODE=$(($EXIT_CODE+$?))
+   if [ $EXIT_CODE == 0 ]; then
+   printf 
"\n==\n"
+   printf "Running Wordcount end-to-end test\n"
+   printf 
"==\n"
+   FLINK_DIR=build-target CLUSTER_MODE=cluster 
test-infra/end-to-end-test/test_batch_wordcount.sh
+   EXIT_CODE=$(($EXIT_CODE+$?))
--- End diff --

If `$EXIT_CODE == 0` then why not simply set `EXIT_CODE=$?`?


---


[jira] [Commented] (FLINK-8222) Update Scala version

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298583#comment-16298583
 ] 

ASF GitHub Bot commented on FLINK-8222:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/5136
  
I'll merge this to 1.5 only.


> Update Scala version
> 
>
> Key: FLINK-8222
> URL: https://issues.apache.org/jira/browse/FLINK-8222
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>
> Update Scala to version {{2.11.12}}. I don't believe this affects the Flink 
> distribution but rather anyone who is compiling Flink or a 
> Flink-quickstart-derived program on a shared system.
> "A privilege escalation vulnerability (CVE-2017-15288) has been identified in 
> the Scala compilation daemon."
> https://www.scala-lang.org/news/security-update-nov17.html



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5136: [FLINK-8222] [build] Update Scala version

2017-12-20 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/5136
  
I'll merge this to 1.5 only.


---


[jira] [Updated] (FLINK-5506) Java 8 - CommunityDetection.java:158 - java.lang.NullPointerException

2017-12-20 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-5506:
--
Fix Version/s: 1.4.1
   1.5.0

> Java 8 - CommunityDetection.java:158 - java.lang.NullPointerException
> -
>
> Key: FLINK-5506
> URL: https://issues.apache.org/jira/browse/FLINK-5506
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.1.4, 1.3.2, 1.4.1
>Reporter: Miguel E. Coimbra
>Assignee: Greg Hogan
>  Labels: easyfix, newbie
> Fix For: 1.5.0, 1.4.1
>
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> Reporting this here as per Vasia's advice.
> I am having the following problem while trying out the 
> org.apache.flink.graph.library.CommunityDetection algorithm of the Gelly API 
> (Java).
> Specs: JDK 1.8.0_102 x64
> Apache Flink: 1.1.4
> Suppose I have a very small (I tried an example with 38 vertices as well) 
> dataset stored in a tab-separated file 3-vertex.tsv:
> {code}
> #id1 id2 score
> 010
> 020
> 030
> {code}
> This is just a central vertex with 3 neighbors (disconnected between 
> themselves).
> I am loading the dataset and executing the algorithm with the following code:
> {code}
> // Load the data from the .tsv file.
> final DataSet> edgeTuples = 
> env.readCsvFile(inputPath)
> .fieldDelimiter("\t") // node IDs are separated by spaces
> .ignoreComments("#")  // comments start with "%"
> .types(Long.class, Long.class, Double.class);
> // Generate a graph and add reverse edges (undirected).
> final Graph graph = Graph.fromTupleDataSet(edgeTuples, 
> new MapFunction() {
> private static final long serialVersionUID = 8713516577419451509L;
> public Long map(Long value) {
> return value;
> }
> },
> env).getUndirected();
> // CommunityDetection parameters.
> final double hopAttenuationDelta = 0.5d;
> final int iterationCount = 10;
> // Prepare and trigger the execution.
> DataSet> vs = graph.run(new 
> org.apache.flink.graph.library.CommunityDetection(iterationCount, 
> hopAttenuationDelta)).getVertices();
> vs.print();
> {code}
> ​Running this code throws the following exception​ (check the bold line):
> {code}
> ​org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:805)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:751)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:751)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.graph.library.CommunityDetection$VertexLabelUpdater.updateVertex(CommunityDetection.java:158)
> at 
> org.apache.flink.graph.spargel.ScatterGatherIteration$GatherUdfSimpleVV.coGroup(ScatterGatherIteration.java:389)
> at 
> org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:218)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
> at 
> org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
> at 
> org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
> at java.lang.Thread.run(Thread.java:745)​
> {code}
> ​After a further look, I set a breakpoint (Eclipse IDE debugging) at the line 
> in bold:
> org.apache.flink.graph.library.CommunityDetection.java (source code accessed 
> 

[jira] [Commented] (FLINK-5506) Java 8 - CommunityDetection.java:158 - java.lang.NullPointerException

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298563#comment-16298563
 ] 

ASF GitHub Bot commented on FLINK-5506:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/5126
  
@StephanEwen thanks for the tip. I'll remove the added `TypeHint` methods 
and commit to 1.4 and 1.5.


> Java 8 - CommunityDetection.java:158 - java.lang.NullPointerException
> -
>
> Key: FLINK-5506
> URL: https://issues.apache.org/jira/browse/FLINK-5506
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.1.4, 1.3.2, 1.4.1
>Reporter: Miguel E. Coimbra
>Assignee: Greg Hogan
>  Labels: easyfix, newbie
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> Reporting this here as per Vasia's advice.
> I am having the following problem while trying out the 
> org.apache.flink.graph.library.CommunityDetection algorithm of the Gelly API 
> (Java).
> Specs: JDK 1.8.0_102 x64
> Apache Flink: 1.1.4
> Suppose I have a very small (I tried an example with 38 vertices as well) 
> dataset stored in a tab-separated file 3-vertex.tsv:
> {code}
> #id1 id2 score
> 010
> 020
> 030
> {code}
> This is just a central vertex with 3 neighbors (disconnected between 
> themselves).
> I am loading the dataset and executing the algorithm with the following code:
> {code}
> // Load the data from the .tsv file.
> final DataSet> edgeTuples = 
> env.readCsvFile(inputPath)
> .fieldDelimiter("\t") // node IDs are separated by spaces
> .ignoreComments("#")  // comments start with "%"
> .types(Long.class, Long.class, Double.class);
> // Generate a graph and add reverse edges (undirected).
> final Graph graph = Graph.fromTupleDataSet(edgeTuples, 
> new MapFunction() {
> private static final long serialVersionUID = 8713516577419451509L;
> public Long map(Long value) {
> return value;
> }
> },
> env).getUndirected();
> // CommunityDetection parameters.
> final double hopAttenuationDelta = 0.5d;
> final int iterationCount = 10;
> // Prepare and trigger the execution.
> DataSet> vs = graph.run(new 
> org.apache.flink.graph.library.CommunityDetection(iterationCount, 
> hopAttenuationDelta)).getVertices();
> vs.print();
> {code}
> ​Running this code throws the following exception​ (check the bold line):
> {code}
> ​org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:805)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:751)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:751)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.graph.library.CommunityDetection$VertexLabelUpdater.updateVertex(CommunityDetection.java:158)
> at 
> org.apache.flink.graph.spargel.ScatterGatherIteration$GatherUdfSimpleVV.coGroup(ScatterGatherIteration.java:389)
> at 
> org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:218)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
> at 
> org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
> at 
> org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
> at java.lang.Thread.run(Thread.java:745)​
> {code}
> ​After a further 

[GitHub] flink issue #5126: [FLINK-5506] [gelly] Fix CommunityDetection NullPointerEx...

2017-12-20 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/5126
  
@StephanEwen thanks for the tip. I'll remove the added `TypeHint` methods 
and commit to 1.4 and 1.5.


---


[jira] [Commented] (FLINK-8233) Retrieve ExecutionResult by REST polling

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298531#comment-16298531
 ] 

ASF GitHub Bot commented on FLINK-8233:
---

GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5194

[FLINK-8233][flip6] Add JobExecutionResultHandler

## What is the purpose of the change

*Allow retrieval of the JobExecutionResult cached in Dispatcher via HTTP. 
This will be needed so that accumulator results can be transmitted to the 
client.*

This PR is based on #5184.

## Brief change log

  - *Add `JobExecutionResultHandler` to enable retrieval of 
`JobExecutionResult`.*
  - *Add serializer and deserializer for `JobExecutionResult`*

## Verifying this change

This change added tests and can be verified as follows:

  - *Added unit tests for all new and changed classes.*
  - *Manually ran the WordCount example job and fetched the 
`JobExecutionResult` with `curl`*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

CC: @tillrohrmann 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-8233-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5194.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5194


commit e91f15fcbbe52d6d47cc1ba3d35ae4768fc6309d
Author: gyao 
Date:   2017-12-19T17:58:53Z

[FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher

- Introduce new JobExecutionResult used by JobMaster to forward the 
information in
  the already existing JobExecutionResult.
- Always cache a JobExecutionResult. Even in case of job failures. In case 
of
  job failures, the serialized exception is stored additionally.
- Introduce new methods to RestfulGateway to allow retrieval of cached
  JobExecutionResults

commit 748745ac3521a20040cbda4056dfd9c53bc24a82
Author: gyao 
Date:   2017-12-20T13:44:03Z

[FLINK-8233][flip6] Add JobExecutionResultHandler

- Allow retrieval of the JobExecutionResult cached in Dispatcher.
- Implement serializer and deserializer for JobExecutionResult.

commit adf091a2770f42d6f8a0c19ab88cc7a208943a32
Author: gyao 
Date:   2017-12-20T13:44:26Z

[hotfix] Clean up ExecutionGraph

- Remove unnecessary throws clause.
- Format whitespace.




> Retrieve ExecutionResult by REST polling
> 
>
> Key: FLINK-8233
> URL: https://issues.apache.org/jira/browse/FLINK-8233
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Retrieve the {{ExecutionResult}} from a finished Flink job via the 
> {{RestClusterClient}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5194: [FLINK-8233][flip6] Add JobExecutionResultHandler

2017-12-20 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5194

[FLINK-8233][flip6] Add JobExecutionResultHandler

## What is the purpose of the change

*Allow retrieval of the JobExecutionResult cached in Dispatcher via HTTP. 
This will be needed so that accumulator results can be transmitted to the 
client.*

This PR is based on #5184.

## Brief change log

  - *Add `JobExecutionResultHandler` to enable retrieval of 
`JobExecutionResult`.*
  - *Add serializer and deserializer for `JobExecutionResult`*

## Verifying this change

This change added tests and can be verified as follows:

  - *Added unit tests for all new and changed classes.*
  - *Manually ran the WordCount example job and fetched the 
`JobExecutionResult` with `curl`*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

CC: @tillrohrmann 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-8233-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5194.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5194


commit e91f15fcbbe52d6d47cc1ba3d35ae4768fc6309d
Author: gyao 
Date:   2017-12-19T17:58:53Z

[FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher

- Introduce new JobExecutionResult used by JobMaster to forward the 
information in
  the already existing JobExecutionResult.
- Always cache a JobExecutionResult. Even in case of job failures. In case 
of
  job failures, the serialized exception is stored additionally.
- Introduce new methods to RestfulGateway to allow retrieval of cached
  JobExecutionResults

commit 748745ac3521a20040cbda4056dfd9c53bc24a82
Author: gyao 
Date:   2017-12-20T13:44:03Z

[FLINK-8233][flip6] Add JobExecutionResultHandler

- Allow retrieval of the JobExecutionResult cached in Dispatcher.
- Implement serializer and deserializer for JobExecutionResult.

commit adf091a2770f42d6f8a0c19ab88cc7a208943a32
Author: gyao 
Date:   2017-12-20T13:44:26Z

[hotfix] Clean up ExecutionGraph

- Remove unnecessary throws clause.
- Format whitespace.




---


[jira] [Assigned] (FLINK-8268) Test instability for 'TwoPhaseCommitSinkFunctionTest'

2017-12-20 Thread Piotr Nowojski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-8268:
-

Assignee: Piotr Nowojski

> Test instability for 'TwoPhaseCommitSinkFunctionTest'
> -
>
> Key: FLINK-8268
> URL: https://issues.apache.org/jira/browse/FLINK-8268
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Stephan Ewen
>Assignee: Piotr Nowojski
>Priority: Critical
>  Labels: test-stability
>
> The following exception / failure message occurs.
> {code}
> Tests run: 5, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 1.824 sec <<< 
> FAILURE! - in 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest
> testIgnoreCommitExceptionDuringRecovery(org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest)
>   Time elapsed: 0.068 sec  <<< ERROR!
> java.lang.Exception: Could not complete snapshot 0 for operator MockTask 
> (1/1).
>   at java.io.FileOutputStream.writeBytes(Native Method)
>   at java.io.FileOutputStream.write(FileOutputStream.java:326)
>   at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
>   at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:291)
>   at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:295)
>   at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:141)
>   at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:229)
>   at java.io.BufferedWriter.flush(BufferedWriter.java:254)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:313)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:288)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:290)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
>   at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest.testIgnoreCommitExceptionDuringRecovery(TwoPhaseCommitSinkFunctionTest.java:208)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8268) Test instability for 'TwoPhaseCommitSinkFunctionTest'

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298389#comment-16298389
 ] 

ASF GitHub Bot commented on FLINK-8268:
---

GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/5193

[FLINK-8268][tests] Improve tests stability

This is a walk-around an error reported in the issue: 
https://issues.apache.org/jira/browse/FLINK-8268 . Instead of writing files to 
disk, this PR creates a simple in memory "file like" abstraction.

Second commit is to further improve stability by cleaning up the resources 
from `MockEnvironment`.

## Verifying this change

This is change in tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink f8268

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5193.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5193


commit 129382c7d3a0d87afd8163e57a48f21819e4134c
Author: Piotr Nowojski 
Date:   2017-12-20T10:23:26Z

[FLINK-8268][streaming][tests] Improve TwoPhaseCommitSinkFunctionTest 
stability by using custom in memory storage

commit 55d0f4b197a062801c0763b7aeaf5d0bc64eac77
Author: Piotr Nowojski 
Date:   2017-12-20T11:27:56Z

[hotfix][tests] Properly shutdown MockEnvironment to release resources




> Test instability for 'TwoPhaseCommitSinkFunctionTest'
> -
>
> Key: FLINK-8268
> URL: https://issues.apache.org/jira/browse/FLINK-8268
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Stephan Ewen
>Priority: Critical
>  Labels: test-stability
>
> The following exception / failure message occurs.
> {code}
> Tests run: 5, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 1.824 sec <<< 
> FAILURE! - in 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest
> testIgnoreCommitExceptionDuringRecovery(org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest)
>   Time elapsed: 0.068 sec  <<< ERROR!
> java.lang.Exception: Could not complete snapshot 0 for operator MockTask 
> (1/1).
>   at java.io.FileOutputStream.writeBytes(Native Method)
>   at java.io.FileOutputStream.write(FileOutputStream.java:326)
>   at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
>   at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:291)
>   at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:295)
>   at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:141)
>   at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:229)
>   at java.io.BufferedWriter.flush(BufferedWriter.java:254)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:313)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:288)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:290)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
>   at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest.testIgnoreCommitExceptionDuringRecovery(TwoPhaseCommitSinkFunctionTest.java:208)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5193: [FLINK-8268][tests] Improve tests stability

2017-12-20 Thread pnowojski
GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/5193

[FLINK-8268][tests] Improve tests stability

This is a walk-around an error reported in the issue: 
https://issues.apache.org/jira/browse/FLINK-8268 . Instead of writing files to 
disk, this PR creates a simple in memory "file like" abstraction.

Second commit is to further improve stability by cleaning up the resources 
from `MockEnvironment`.

## Verifying this change

This is change in tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink f8268

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5193.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5193


commit 129382c7d3a0d87afd8163e57a48f21819e4134c
Author: Piotr Nowojski 
Date:   2017-12-20T10:23:26Z

[FLINK-8268][streaming][tests] Improve TwoPhaseCommitSinkFunctionTest 
stability by using custom in memory storage

commit 55d0f4b197a062801c0763b7aeaf5d0bc64eac77
Author: Piotr Nowojski 
Date:   2017-12-20T11:27:56Z

[hotfix][tests] Properly shutdown MockEnvironment to release resources




---


[jira] [Commented] (FLINK-8257) Unify the value checks for setParallelism()

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298303#comment-16298303
 ] 

ASF GitHub Bot commented on FLINK-8257:
---

GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/5192

[FLINK-8257] [conf] Unify the value checks for setParallelism()

## What is the purpose of the change

This PR unifies the value checks for `setParallelism()` methods in 
different components.

## Brief change log

 flink-java:
Updates the error message in 
`org.apache.flink.api.java.operators.Operator.setParallelism()`.

 flink-streaming-java:
Refines the check in `StreamTransformation.setParallelism()`.
Changes the value check to `parallelism != 1` in 
`DataStreamSource.setParallelism()`.
Removes the value checks in `SingleOutputStreamOperator.setParallelism()` 
and `StreamExecutionEnvironment.setParallelism()`.

 flink-gelly:
Updates the error messages for `setParallelism()` methods in 
`GraphAnalyticBase`, `IterationConfiguration` and `GraphAlgorithmWrappingBase`.

The methods in flink-runtime package were kept unchanged.

Since `org.apache.flink.api.common.operators.Operator.setParallelism()` is 
only used internally, no extra check was provided for it.

## Verifying this change
This change is already covered by existing tests

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-8257

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5192.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5192


commit 44681d1a06373c5e9e723bd202ee053aaf95b3e8
Author: Xingcan Cui 
Date:   2017-12-20T09:11:12Z

[FLINK-8257] [conf] Unify the value checks for setParallelism()




> Unify the value checks for setParallelism()
> ---
>
> Key: FLINK-8257
> URL: https://issues.apache.org/jira/browse/FLINK-8257
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>
> The {{setParallelism()}} method exist in many components from different 
> levels. Some of the methods require the input value to be greater than {{1}} 
> (e.g., {{StreamTransformation.setParallelism()}}), while some of them also 
> allow the value to be {{ExecutionConfig.PARALLELISM_DEFAULT}}, which is 
> {{-1}} by default (e.g., {{DataSink.setParallelism()}}). We need to unify the 
> value checks for these methods.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5192: [FLINK-8257] [conf] Unify the value checks for set...

2017-12-20 Thread xccui
GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/5192

[FLINK-8257] [conf] Unify the value checks for setParallelism()

## What is the purpose of the change

This PR unifies the value checks for `setParallelism()` methods in 
different components.

## Brief change log

 flink-java:
Updates the error message in 
`org.apache.flink.api.java.operators.Operator.setParallelism()`.

 flink-streaming-java:
Refines the check in `StreamTransformation.setParallelism()`.
Changes the value check to `parallelism != 1` in 
`DataStreamSource.setParallelism()`.
Removes the value checks in `SingleOutputStreamOperator.setParallelism()` 
and `StreamExecutionEnvironment.setParallelism()`.

 flink-gelly:
Updates the error messages for `setParallelism()` methods in 
`GraphAnalyticBase`, `IterationConfiguration` and `GraphAlgorithmWrappingBase`.

The methods in flink-runtime package were kept unchanged.

Since `org.apache.flink.api.common.operators.Operator.setParallelism()` is 
only used internally, no extra check was provided for it.

## Verifying this change
This change is already covered by existing tests

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-8257

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5192.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5192


commit 44681d1a06373c5e9e723bd202ee053aaf95b3e8
Author: Xingcan Cui 
Date:   2017-12-20T09:11:12Z

[FLINK-8257] [conf] Unify the value checks for setParallelism()




---


[GitHub] flink pull request #5191: Release 1.4

2017-12-20 Thread czhxmz
GitHub user czhxmz opened a pull request:

https://github.com/apache/flink/pull/5191

Release 1.4

*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## Contribution Checklist

  - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
  
  - Name the pull request in the form "[FLINK-] [component] Title of 
the pull request", where *FLINK-* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
  Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

  - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
  
  - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).

  - Each pull request should address only one issue, not mix up code from 
multiple issues.
  
  - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)

  - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

*(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*


## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
  - The serializers: (yes / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  - The S3 file system connector: (yes / no / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apache/flink release-1.4

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5191.patch

To close this 

[jira] [Commented] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298177#comment-16298177
 ] 

ASF GitHub Bot commented on FLINK-8289:
---

GitHub user shuai-xu opened a pull request:

https://github.com/apache/flink/pull/5190

[FLINK-8289] [runtime] set the rest.address to the host of the rest server 
machine


## What is the purpose of the change

This pull request set the rest.address to the host of the Dispatcher or 
JobMaster, so that the real address of the rest server instead of 0.0.0.0:9067 
or 127.0.0.0:9067.



## Verifying this change

This change is tested manually.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shuai-xu/flink jira-8289

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5190.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5190


commit 0956b94a910df071a61aae90cac7fef2b795ed0c
Author: shuai.xus 
Date:   2017-12-20T09:37:10Z

[FLINK-8289] [runtime] set the rest.address to the host of the rest server 
machine




> The RestServerEndpoint should return the address with real ip when 
> getRestAdddress
> --
>
> Key: FLINK-8289
> URL: https://issues.apache.org/jira/browse/FLINK-8289
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0
>Reporter: shuai.xu
>  Labels: flip-6
>
> Now when RestServerEndpoint.getRestAddress, it will return an address same 
> with the value of config rest.address, the default it 127.0.0.1:9067, but 
> this address can not be accessed from another machine. And the ip for 
> Dispatcher and JobMaster are usually dynamically, so user will configure it 
> to 0.0.0.0, and the getRestAddress will return 0.0.0.0:9067, this address 
> will be registered to YARN or Mesos, but this address can not be accessed 
> from another machine also. So it need to return the real ip:port for user to 
> access the web monitor anywhere.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5190: [FLINK-8289] [runtime] set the rest.address to the...

2017-12-20 Thread shuai-xu
GitHub user shuai-xu opened a pull request:

https://github.com/apache/flink/pull/5190

[FLINK-8289] [runtime] set the rest.address to the host of the rest server 
machine


## What is the purpose of the change

This pull request set the rest.address to the host of the Dispatcher or 
JobMaster, so that the real address of the rest server instead of 0.0.0.0:9067 
or 127.0.0.0:9067.



## Verifying this change

This change is tested manually.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shuai-xu/flink jira-8289

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5190.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5190


commit 0956b94a910df071a61aae90cac7fef2b795ed0c
Author: shuai.xus 
Date:   2017-12-20T09:37:10Z

[FLINK-8289] [runtime] set the rest.address to the host of the rest server 
machine




---


[jira] [Commented] (FLINK-8271) upgrade from deprecated classes to AmazonKinesis

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298162#comment-16298162
 ] 

ASF GitHub Bot commented on FLINK-8271:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5171
  
Ah, I see. The `AmazonKinesisClient` class is not deprecated, but the 
constructors are.
Alright, thanks for the clarification. I'll find some time to revisit this 
PR tomorrow.


> upgrade from deprecated classes to AmazonKinesis
> 
>
> Key: FLINK-8271
> URL: https://issues.apache.org/jira/browse/FLINK-8271
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.5.0, 1.4.1
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5171: [FLINK-8271][Kinesis connector] upgrade from deprecated c...

2017-12-20 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5171
  
Ah, I see. The `AmazonKinesisClient` class is not deprecated, but the 
constructors are.
Alright, thanks for the clarification. I'll find some time to revisit this 
PR tomorrow.


---


[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners

2017-12-20 Thread cttestid41 (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298159#comment-16298159
 ] 

cttestid41 commented on FLINK-8234:
---

Test comment

> Cache JobExecutionResult from finished JobManagerRunners
> 
>
> Key: FLINK-8234
> URL: https://issues.apache.org/jira/browse/FLINK-8234
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to serve the {{JobExecutionResults}} we have to cache them in the 
> {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should 
> have a configurable size and should periodically clean up stale entries in 
> order to avoid memory leaks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8283) FlinkKafkaConsumerBase failing on Travis with no output in 10min

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298157#comment-16298157
 ] 

ASF GitHub Bot commented on FLINK-8283:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5181
  
Also FYI: The stalling tests seems to have been fixed (indirectly?) by 
fixing the mocking issue. No failures have occurred anymore over 10 test local 
Travis runs.


> FlinkKafkaConsumerBase failing on Travis with no output in 10min
> 
>
> Key: FLINK-8283
> URL: https://issues.apache.org/jira/browse/FLINK-8283
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>  Labels: test-stability
>
> Since a few days, Travis builds with the {{connectors}} profile keep failing 
> more often with no new output being received within 10 minutes. It seems to 
> start with the Travis build for 
> https://github.com/apache/flink/commit/840cbfbf0845b60dbf02dd2f37f696f1db21b1e9
>  but may have been introduced earlier. The printed offsets look strange 
> though.
> {code}
> 16:33:12,508 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Setting 
> restore state in the FlinkKafkaConsumer: 
> {KafkaTopicPartition{topic='test-topic', partition=0}=-915623761773, 
> KafkaTopicPartition{topic='test-topic', partition=1}=-915623761773}
> 16:33:12,520 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - 
> Consumer subtask 2 will start reading 66 partitions with offsets in restored 
> state: {KafkaTopicPartition{topic='test-topic', partition=851}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=716}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=461}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=206}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=971}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=836}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=581}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=326}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=71}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=956}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=701}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=446}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=191}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=56}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=821}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=566}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=311}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=881}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=626}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=371}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=236}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=746}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=491}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=356}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=101}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=866}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=611}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=476}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=221}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=986}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=731}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=596}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=341}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=86}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=656}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=401}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=146}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=911}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=776}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=521}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=266}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=11}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', 

[GitHub] flink issue #5181: [FLINK-8283] [kafka] Fix mock verification on final metho...

2017-12-20 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5181
  
Also FYI: The stalling tests seems to have been fixed (indirectly?) by 
fixing the mocking issue. No failures have occurred anymore over 10 test local 
Travis runs.


---


[jira] [Commented] (FLINK-8283) FlinkKafkaConsumerBase failing on Travis with no output in 10min

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298155#comment-16298155
 ] 

ASF GitHub Bot commented on FLINK-8283:
---

Github user tzulitai closed the pull request at:

https://github.com/apache/flink/pull/5181


> FlinkKafkaConsumerBase failing on Travis with no output in 10min
> 
>
> Key: FLINK-8283
> URL: https://issues.apache.org/jira/browse/FLINK-8283
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>  Labels: test-stability
>
> Since a few days, Travis builds with the {{connectors}} profile keep failing 
> more often with no new output being received within 10 minutes. It seems to 
> start with the Travis build for 
> https://github.com/apache/flink/commit/840cbfbf0845b60dbf02dd2f37f696f1db21b1e9
>  but may have been introduced earlier. The printed offsets look strange 
> though.
> {code}
> 16:33:12,508 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Setting 
> restore state in the FlinkKafkaConsumer: 
> {KafkaTopicPartition{topic='test-topic', partition=0}=-915623761773, 
> KafkaTopicPartition{topic='test-topic', partition=1}=-915623761773}
> 16:33:12,520 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - 
> Consumer subtask 2 will start reading 66 partitions with offsets in restored 
> state: {KafkaTopicPartition{topic='test-topic', partition=851}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=716}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=461}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=206}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=971}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=836}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=581}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=326}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=71}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=956}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=701}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=446}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=191}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=56}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=821}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=566}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=311}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=881}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=626}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=371}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=236}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=746}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=491}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=356}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=101}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=866}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=611}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=476}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=221}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=986}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=731}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=596}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=341}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=86}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=656}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=401}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=146}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=911}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=776}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=521}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=266}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=11}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=896}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=641}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=386}=-915623761775, 
> 

[jira] [Commented] (FLINK-8283) FlinkKafkaConsumerBase failing on Travis with no output in 10min

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298154#comment-16298154
 ] 

ASF GitHub Bot commented on FLINK-8283:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5181
  
Making the `AbstractFetcher::commitInternalOffsetsToKafka` non-final just 
for the sake of testing using mocks really is not ideal.

I've opened a new PR #5189 that properly solves this by introducing a 
proper abstraction for offset committing. Closing this PR in favor of the new 
one.


> FlinkKafkaConsumerBase failing on Travis with no output in 10min
> 
>
> Key: FLINK-8283
> URL: https://issues.apache.org/jira/browse/FLINK-8283
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>  Labels: test-stability
>
> Since a few days, Travis builds with the {{connectors}} profile keep failing 
> more often with no new output being received within 10 minutes. It seems to 
> start with the Travis build for 
> https://github.com/apache/flink/commit/840cbfbf0845b60dbf02dd2f37f696f1db21b1e9
>  but may have been introduced earlier. The printed offsets look strange 
> though.
> {code}
> 16:33:12,508 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Setting 
> restore state in the FlinkKafkaConsumer: 
> {KafkaTopicPartition{topic='test-topic', partition=0}=-915623761773, 
> KafkaTopicPartition{topic='test-topic', partition=1}=-915623761773}
> 16:33:12,520 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - 
> Consumer subtask 2 will start reading 66 partitions with offsets in restored 
> state: {KafkaTopicPartition{topic='test-topic', partition=851}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=716}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=461}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=206}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=971}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=836}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=581}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=326}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=71}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=956}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=701}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=446}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=191}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=56}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=821}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=566}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=311}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=881}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=626}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=371}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=236}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=746}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=491}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=356}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=101}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=866}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=611}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=476}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=221}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=986}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=731}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=596}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=341}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=86}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=656}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=401}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=146}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=911}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=776}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=521}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=266}=-915623761775, 
> 

[GitHub] flink pull request #5181: [FLINK-8283] [kafka] Fix mock verification on fina...

2017-12-20 Thread tzulitai
Github user tzulitai closed the pull request at:

https://github.com/apache/flink/pull/5181


---


[GitHub] flink issue #5181: [FLINK-8283] [kafka] Fix mock verification on final metho...

2017-12-20 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5181
  
Making the `AbstractFetcher::commitInternalOffsetsToKafka` non-final just 
for the sake of testing using mocks really is not ideal.

I've opened a new PR #5189 that properly solves this by introducing a 
proper abstraction for offset committing. Closing this PR in favor of the new 
one.


---


[jira] [Commented] (FLINK-8283) FlinkKafkaConsumerBase failing on Travis with no output in 10min

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298151#comment-16298151
 ] 

ASF GitHub Bot commented on FLINK-8283:
---

GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/5189

 [FLINK-8283] [kafka] Introduce KafkaOffsetCommitter interface

## What is the purpose of the change

This PR is built upon the reworked `FlinkKafkaConsumerBaseTest` in #5188.
The broken `FlinkKafkaConsumerBaseTest` is properly fixed only when both 
#5188 and this PR is merged.

Prior to this PR, offset committing was coupled tightly with the 
`AbstractFetcher`, making unit tests for offset committing behaviours hard to 
compose concisely. For example, we had tests that required mocking the offset 
commit methods on `AbstractFetcher`, while ideally, it would be best that those 
methods are made final (thus, unable to be mocked) to prevent accidental 
overrides.

This PR decouples offset committing as a separate service behind a new 
`KafkaOffsetCommitter` interface. For now, the `AbstractFetcher` is reused as 
an implementation of this service, so that this PR does not introduce any more 
change other than introducing a new layer of abstraction.

Unit tests that verify offset committing behaviour now provide a dummy 
verifiable implementation of the `KafkaOffsetCommitter` (instead of using mocks 
on AbstractFetcher) and test against that.

## Brief change log

- Migrate `AbstractFetcher::commitInternalOffsetsToKafka` method to the 
newly introduced `KafkaOffsetCommitter` interface.
- Let `AbstractFetcher` implement `KafkaOffsetCommitter`
- In the `FlinkKafkaConsumerBase`, let "offset committing" and "record 
fetching" be logically separated to be handled by two services, i.e. namely a 
`KafkaOffsetCommitter` and a `AbstractFetcher`. Physically, the fetcher 
instance sits behind both service abstractions.
- In `FlinkKafkaConsumerBaseTest`, remove all mocks on 
`AbstractFetcher::commitInternalOffsetsToKafka`, and test against a 
`KafkaOffsetCommitter` instead.

## Verifying this change

This PR does not add any new functionality. Reworked test also do not 
affect test coverage.
`FlinkKafkaConsumerBaseTest` verifies all changes.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? n/a


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-8283-KafkaOffsetCommitter

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5189.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5189


commit 620d500b1a14f8f5016e56fdc3d65d853ce8848d
Author: Tzu-Li (Gordon) Tai 
Date:   2017-12-20T00:10:44Z

[FLINK-8296] [kafka] Rework FlinkKafkaConsumerBaseTest to not rely on Java 
reflection

Reflection was mainly used to inject mocks into private fields of the
FlinkKafkaConsumerBase, without the need to fully execute all operator
life cycle methods. This, however, caused the unit tests to be too
implementation-specific.

This commit reworks the FlinkKafkaConsumerBaseTest to remove test
consumer instantiation methods that rely on reflection for dependency
injection. All tests now instantiate dummy test consumers normally, and
let all tests properly execute all operator life cycle methods
regardless of the tested logic.

commit 0d19e99d3fb3359f43c2db91611257a5edb2e17f
Author: Tzu-Li (Gordon) Tai 
Date:   2017-12-19T19:55:16Z

[FLINK-8283] [kafka] Introduce KafkaOffsetCommitter interface

Prior to this commit, offset committing was coupled tightly with the
AbstractFetcher, making unit tests for offset committing behaviours hard
to compose concisely. For example, we had tests that mock the offset
commit methods on AbstractFetcher, while ideally, it would be best that
those methods are made final to prevent accidental overrides.

This commit decouples offset committing as a separate service behind a
new KafkaOffsetCommitter interface. For now, the AbstractFetcher is
reused as an implementation of this service.

Unit tests that verify offset committing 

[GitHub] flink pull request #5189: [FLINK-8283] [kafka] Introduce KafkaOffsetCommitte...

2017-12-20 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/5189

 [FLINK-8283] [kafka] Introduce KafkaOffsetCommitter interface

## What is the purpose of the change

This PR is built upon the reworked `FlinkKafkaConsumerBaseTest` in #5188.
The broken `FlinkKafkaConsumerBaseTest` is properly fixed only when both 
#5188 and this PR is merged.

Prior to this PR, offset committing was coupled tightly with the 
`AbstractFetcher`, making unit tests for offset committing behaviours hard to 
compose concisely. For example, we had tests that required mocking the offset 
commit methods on `AbstractFetcher`, while ideally, it would be best that those 
methods are made final (thus, unable to be mocked) to prevent accidental 
overrides.

This PR decouples offset committing as a separate service behind a new 
`KafkaOffsetCommitter` interface. For now, the `AbstractFetcher` is reused as 
an implementation of this service, so that this PR does not introduce any more 
change other than introducing a new layer of abstraction.

Unit tests that verify offset committing behaviour now provide a dummy 
verifiable implementation of the `KafkaOffsetCommitter` (instead of using mocks 
on AbstractFetcher) and test against that.

## Brief change log

- Migrate `AbstractFetcher::commitInternalOffsetsToKafka` method to the 
newly introduced `KafkaOffsetCommitter` interface.
- Let `AbstractFetcher` implement `KafkaOffsetCommitter`
- In the `FlinkKafkaConsumerBase`, let "offset committing" and "record 
fetching" be logically separated to be handled by two services, i.e. namely a 
`KafkaOffsetCommitter` and a `AbstractFetcher`. Physically, the fetcher 
instance sits behind both service abstractions.
- In `FlinkKafkaConsumerBaseTest`, remove all mocks on 
`AbstractFetcher::commitInternalOffsetsToKafka`, and test against a 
`KafkaOffsetCommitter` instead.

## Verifying this change

This PR does not add any new functionality. Reworked test also do not 
affect test coverage.
`FlinkKafkaConsumerBaseTest` verifies all changes.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? n/a


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-8283-KafkaOffsetCommitter

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5189.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5189


commit 620d500b1a14f8f5016e56fdc3d65d853ce8848d
Author: Tzu-Li (Gordon) Tai 
Date:   2017-12-20T00:10:44Z

[FLINK-8296] [kafka] Rework FlinkKafkaConsumerBaseTest to not rely on Java 
reflection

Reflection was mainly used to inject mocks into private fields of the
FlinkKafkaConsumerBase, without the need to fully execute all operator
life cycle methods. This, however, caused the unit tests to be too
implementation-specific.

This commit reworks the FlinkKafkaConsumerBaseTest to remove test
consumer instantiation methods that rely on reflection for dependency
injection. All tests now instantiate dummy test consumers normally, and
let all tests properly execute all operator life cycle methods
regardless of the tested logic.

commit 0d19e99d3fb3359f43c2db91611257a5edb2e17f
Author: Tzu-Li (Gordon) Tai 
Date:   2017-12-19T19:55:16Z

[FLINK-8283] [kafka] Introduce KafkaOffsetCommitter interface

Prior to this commit, offset committing was coupled tightly with the
AbstractFetcher, making unit tests for offset committing behaviours hard
to compose concisely. For example, we had tests that mock the offset
commit methods on AbstractFetcher, while ideally, it would be best that
those methods are made final to prevent accidental overrides.

This commit decouples offset committing as a separate service behind a
new KafkaOffsetCommitter interface. For now, the AbstractFetcher is
reused as an implementation of this service.

Unit tests that verify offset committing behaviour now provide a dummy
verifyable implementation of the KafkaOffsetCommitter, instead of using
mocks on AbstractFetcher.




---


[GitHub] flink issue #4559: [FLINK-7468][network] Implement sender backlog logic for ...

2017-12-20 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4559
  
@NicoK , I have submitted the `hotfix` commit to address above comments.


---


[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298149#comment-16298149
 ] 

ASF GitHub Bot commented on FLINK-7468:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4559
  
@NicoK , I have submitted the `hotfix` commit to address above comments.


> Implement sender backlog logic for credit-based
> ---
>
> Key: FLINK-7468
> URL: https://issues.apache.org/jira/browse/FLINK-7468
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Receivers should know how many buffers are available on the sender side (the 
> backlog). The receivers use this information to decide how to distribute 
> floating buffers.
> The {{ResultSubpartition}} maintains the backlog which only indicates the 
> number of buffers in this subpartition, not including the number of events. 
> The backlog is increased for adding buffer to this subpartition, and 
> decreased for polling buffer from it.
> The backlog is attached in {{BufferResponse}} by sender as an absolute value 
> after the buffer being transferred.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8295) Netty shading does not work properly

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298145#comment-16298145
 ] 

ASF GitHub Bot commented on FLINK-8295:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5183
  
Thanks for the fix @NicoK. I will have a look at it later today and merge 
this.


> Netty shading does not work properly
> 
>
> Key: FLINK-8295
> URL: https://issues.apache.org/jira/browse/FLINK-8295
> Project: Flink
>  Issue Type: Bug
>  Components: Cassandra Connector, Core
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>Assignee: Nico Kruber
>
> Multiple users complained that the Cassandra connector is not usable in Flink 
> 1.4.0 due to wrong/insufficient shading of Netty.
> See:
> http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3Cb1f584b918c8aaf98b744c168407b0f5%40dbruhn.de%3E
> http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3CCACk7FTgMPR03bPBoKzmeVKCqS%2BumTR1u1X%2BKdPtHRgbnUZiO3A%40mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5183: [FLINK-8295][cassandra][build] properly shade netty for t...

2017-12-20 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5183
  
Thanks for the fix @NicoK. I will have a look at it later today and merge 
this.


---


[jira] [Commented] (FLINK-8296) Rework FlinkKafkaConsumerBestTest to not use Java reflection for dependency injection

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298137#comment-16298137
 ] 

ASF GitHub Bot commented on FLINK-8296:
---

GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/5188

[FLINK-8296] [kafka] Rework FlinkKafkaConsumerBaseTest to not rely on Java 
reflection

## What is the purpose of the change

Prior to this PR, reflection was mainly used to inject mocks into private 
fields of the `FlinkKafkaConsumerBase`, without the need to fully execute all 
operator life cycle methods.
This was done using the `FlinkKafkaConsumerBaseTest::getConsumer(...)` 
method (have been removed by this PR). This, however, caused the unit tests to 
be too implementation-specific and hard to extend.

This PR reworks the `FlinkKafkaConsumerBaseTest` to remove the 
reflection-based `FlinkKafkaConsumerBaseTest::getConsumer(...)` method.
All tests now instantiate the `DummyFlinkKafkaConsumer` normally, and let 
all tests properly execute all operator life cycle methods regardless of the 
tested logic.

## Brief change log

- Remove reflection-relying `FlinkKafkaConsumerBaseTest::getConsumer(...)` 
method.
- Generalize the `DummyFlinkKafkaConsumer` class
- Introduce the `FlinkKafkaConsumerBaseTest::setupConsumer(...)` method 
that iterates through all normal operator life cycle methods.
- The test pattern for all unit tests in the `FlinkKafkaConsumerBaseTest` 
is now: 1) instantiate a `DummyFlinkKafkaConsumer`, and 2) call 
`setupConsumer(dummyConsumer)` to make sure the consumer goes through all life 
cycle methods, and instance fields are properly instantiated (instead of 
relying on reflection like before).

## Verifying this change

Test coverage of the reworked `FlinkKafkaConsumerBaseTest` has not been 
touched.
That unit test verifies this change.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: no 
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? n/a


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-8296

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5188.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5188


commit 620d500b1a14f8f5016e56fdc3d65d853ce8848d
Author: Tzu-Li (Gordon) Tai 
Date:   2017-12-20T00:10:44Z

[FLINK-8296] [kafka] Rework FlinkKafkaConsumerBaseTest to not rely on Java 
reflection

Reflection was mainly used to inject mocks into private fields of the
FlinkKafkaConsumerBase, without the need to fully execute all operator
life cycle methods. This, however, caused the unit tests to be too
implementation-specific.

This commit reworks the FlinkKafkaConsumerBaseTest to remove test
consumer instantiation methods that rely on reflection for dependency
injection. All tests now instantiate dummy test consumers normally, and
let all tests properly execute all operator life cycle methods
regardless of the tested logic.




> Rework FlinkKafkaConsumerBestTest to not use Java reflection for dependency 
> injection
> -
>
> Key: FLINK-8296
> URL: https://issues.apache.org/jira/browse/FLINK-8296
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector, Tests
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.5.0, 1.4.1
>
>
> The current {{FlinkKafkaConsumerBaseTest}} is heavily relying on Java 
> reflection for dependency injection. Using reflection to compose unit tests 
> really should be a last resort, and indicates that the tests there are highly 
> implementation-specific, and that we should make the design more testable.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5188: [FLINK-8296] [kafka] Rework FlinkKafkaConsumerBase...

2017-12-20 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/5188

[FLINK-8296] [kafka] Rework FlinkKafkaConsumerBaseTest to not rely on Java 
reflection

## What is the purpose of the change

Prior to this PR, reflection was mainly used to inject mocks into private 
fields of the `FlinkKafkaConsumerBase`, without the need to fully execute all 
operator life cycle methods.
This was done using the `FlinkKafkaConsumerBaseTest::getConsumer(...)` 
method (have been removed by this PR). This, however, caused the unit tests to 
be too implementation-specific and hard to extend.

This PR reworks the `FlinkKafkaConsumerBaseTest` to remove the 
reflection-based `FlinkKafkaConsumerBaseTest::getConsumer(...)` method.
All tests now instantiate the `DummyFlinkKafkaConsumer` normally, and let 
all tests properly execute all operator life cycle methods regardless of the 
tested logic.

## Brief change log

- Remove reflection-relying `FlinkKafkaConsumerBaseTest::getConsumer(...)` 
method.
- Generalize the `DummyFlinkKafkaConsumer` class
- Introduce the `FlinkKafkaConsumerBaseTest::setupConsumer(...)` method 
that iterates through all normal operator life cycle methods.
- The test pattern for all unit tests in the `FlinkKafkaConsumerBaseTest` 
is now: 1) instantiate a `DummyFlinkKafkaConsumer`, and 2) call 
`setupConsumer(dummyConsumer)` to make sure the consumer goes through all life 
cycle methods, and instance fields are properly instantiated (instead of 
relying on reflection like before).

## Verifying this change

Test coverage of the reworked `FlinkKafkaConsumerBaseTest` has not been 
touched.
That unit test verifies this change.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: no 
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? n/a


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-8296

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5188.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5188


commit 620d500b1a14f8f5016e56fdc3d65d853ce8848d
Author: Tzu-Li (Gordon) Tai 
Date:   2017-12-20T00:10:44Z

[FLINK-8296] [kafka] Rework FlinkKafkaConsumerBaseTest to not rely on Java 
reflection

Reflection was mainly used to inject mocks into private fields of the
FlinkKafkaConsumerBase, without the need to fully execute all operator
life cycle methods. This, however, caused the unit tests to be too
implementation-specific.

This commit reworks the FlinkKafkaConsumerBaseTest to remove test
consumer instantiation methods that rely on reflection for dependency
injection. All tests now instantiate dummy test consumers normally, and
let all tests properly execute all operator life cycle methods
regardless of the tested logic.




---


[jira] [Commented] (FLINK-8295) Netty shading does not work properly

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298135#comment-16298135
 ] 

ASF GitHub Bot commented on FLINK-8295:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/5183
  
true - I adapted the comment accordingly


> Netty shading does not work properly
> 
>
> Key: FLINK-8295
> URL: https://issues.apache.org/jira/browse/FLINK-8295
> Project: Flink
>  Issue Type: Bug
>  Components: Cassandra Connector, Core
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>Assignee: Nico Kruber
>
> Multiple users complained that the Cassandra connector is not usable in Flink 
> 1.4.0 due to wrong/insufficient shading of Netty.
> See:
> http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3Cb1f584b918c8aaf98b744c168407b0f5%40dbruhn.de%3E
> http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3CCACk7FTgMPR03bPBoKzmeVKCqS%2BumTR1u1X%2BKdPtHRgbnUZiO3A%40mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5183: [FLINK-8295][cassandra][build] properly shade netty for t...

2017-12-20 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/5183
  
true - I adapted the comment accordingly


---


[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298092#comment-16298092
 ] 

ASF GitHub Bot commented on FLINK-8234:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r157963387
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
 ---
@@ -405,22 +407,27 @@ private void decrementCheckAndCleanup() {
 
private volatile Throwable runnerException;
 
-   private volatile JobExecutionResult result;
+   private volatile 
org.apache.flink.runtime.jobmaster.JobExecutionResult result;

BlockingJobSync(JobID jobId, int numJobMastersToWaitFor) {
this.jobId = jobId;
this.jobMastersToWaitFor = new 
CountDownLatch(numJobMastersToWaitFor);
}
 
@Override
-   public void jobFinished(JobExecutionResult jobResult) {
-   this.result = jobResult;
+   public void 
jobFinished(org.apache.flink.runtime.jobmaster.JobExecutionResult result) {
+   this.result = result;
jobMastersToWaitFor.countDown();
}
 
@Override
-   public void jobFailed(Throwable cause) {
-   jobException = cause;
+   public void 
jobFailed(org.apache.flink.runtime.jobmaster.JobExecutionResult result) {
+   
checkArgument(result.getSerializedThrowable().isPresent());
+
+   jobException = result
--- End diff --

Actually it is not needed to store the exception separately because the 
JobExecutionResult already contains the exception.


> Cache JobExecutionResult from finished JobManagerRunners
> 
>
> Key: FLINK-8234
> URL: https://issues.apache.org/jira/browse/FLINK-8234
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to serve the {{JobExecutionResults}} we have to cache them in the 
> {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should 
> have a configurable size and should periodically clean up stale entries in 
> order to avoid memory leaks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2017-12-20 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r157963387
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
 ---
@@ -405,22 +407,27 @@ private void decrementCheckAndCleanup() {
 
private volatile Throwable runnerException;
 
-   private volatile JobExecutionResult result;
+   private volatile 
org.apache.flink.runtime.jobmaster.JobExecutionResult result;

BlockingJobSync(JobID jobId, int numJobMastersToWaitFor) {
this.jobId = jobId;
this.jobMastersToWaitFor = new 
CountDownLatch(numJobMastersToWaitFor);
}
 
@Override
-   public void jobFinished(JobExecutionResult jobResult) {
-   this.result = jobResult;
+   public void 
jobFinished(org.apache.flink.runtime.jobmaster.JobExecutionResult result) {
+   this.result = result;
jobMastersToWaitFor.countDown();
}
 
@Override
-   public void jobFailed(Throwable cause) {
-   jobException = cause;
+   public void 
jobFailed(org.apache.flink.runtime.jobmaster.JobExecutionResult result) {
+   
checkArgument(result.getSerializedThrowable().isPresent());
+
+   jobException = result
--- End diff --

Actually it is not needed to store the exception separately because the 
JobExecutionResult already contains the exception.


---


[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298081#comment-16298081
 ] 

ASF GitHub Bot commented on FLINK-8234:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r157962431
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
 ---
@@ -458,7 +465,14 @@ public JobExecutionResult getResult() throws 
JobExecutionException, InterruptedE
}
}
else if (result != null) {
-   return result;
+   try {
+   return new SerializedJobExecutionResult(
+   jobId,
+   result.getNetRuntime(),
+   
result.getAccumulatorResults()).toJobExecutionResult(ClassLoader.getSystemClassLoader());
--- End diff --

Because the exception is serialized in 
`OnCompletionActions#jobFailed(JobExecutionResult);`, I have to deserialize it 
here again. I wonder if this is sane?



> Cache JobExecutionResult from finished JobManagerRunners
> 
>
> Key: FLINK-8234
> URL: https://issues.apache.org/jira/browse/FLINK-8234
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to serve the {{JobExecutionResults}} we have to cache them in the 
> {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should 
> have a configurable size and should periodically clean up stale entries in 
> order to avoid memory leaks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2017-12-20 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r157962431
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
 ---
@@ -458,7 +465,14 @@ public JobExecutionResult getResult() throws 
JobExecutionException, InterruptedE
}
}
else if (result != null) {
-   return result;
+   try {
+   return new SerializedJobExecutionResult(
+   jobId,
+   result.getNetRuntime(),
+   
result.getAccumulatorResults()).toJobExecutionResult(ClassLoader.getSystemClassLoader());
--- End diff --

Because the exception is serialized in 
`OnCompletionActions#jobFailed(JobExecutionResult);`, I have to deserialize it 
here again. I wonder if this is sane?



---


[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298065#comment-16298065
 ] 

ASF GitHub Bot commented on FLINK-8234:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5168
  
New PR #5184 


> Cache JobExecutionResult from finished JobManagerRunners
> 
>
> Key: FLINK-8234
> URL: https://issues.apache.org/jira/browse/FLINK-8234
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to serve the {{JobExecutionResults}} we have to cache them in the 
> {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should 
> have a configurable size and should periodically clean up stale entries in 
> order to avoid memory leaks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   >