[jira] [Commented] (FLINK-2991) Extend Window Operators to Allow Efficient Fold Operation

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15140460#comment-15140460
 ] 

ASF GitHub Bot commented on FLINK-2991:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1605#issuecomment-182237986
  
I'll add doc and also fix some other stuff in the doc that wasn't updated.


> Extend Window Operators to Allow Efficient Fold Operation
> -
>
> Key: FLINK-2991
> URL: https://issues.apache.org/jira/browse/FLINK-2991
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> Right now, a window fold is implemented as a WindowFunction that gets all the 
> elements as input. No pre-aggregation is performed. The window operator 
> should be extended to also allow the fold to also be pre-aggregated.
> This requires changing the signature of the {{WindowBuffer}} so that it can 
> emit a type other than the input type. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2991] Add Folding State and use in Wind...

2016-02-09 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1605#issuecomment-182237986
  
I'll add doc and also fix some other stuff in the doc that wasn't updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3035) Redis as State Backend

2016-02-09 Thread Subhobrata Dey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15140239#comment-15140239
 ] 

Subhobrata Dey commented on FLINK-3035:
---

Hello [~mjsax] Thanks for replying. I adopted & created a PR out of my 
implementation. Would love to know your views.

> Redis as State Backend
> --
>
> Key: FLINK-3035
> URL: https://issues.apache.org/jira/browse/FLINK-3035
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Matthias J. Sax
>Assignee: Subhobrata Dey
>Priority: Minor
>
> Add Redis as a state backend for distributed snapshots.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3035] Redis as State Backend

2016-02-09 Thread sbcd90
GitHub user sbcd90 opened a pull request:

https://github.com/apache/flink/pull/1617

[FLINK-3035] Redis as State Backend

@mjsax please review.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sbcd90/flink FLINK-3035

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1617.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1617


commit 5a4b2f09e6990185ca6cdf3d91a4561dcb23098b
Author: Subhobrata Dey 
Date:   2016-02-10T01:33:13Z

[FLINK-3035] Redis as State Backend




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3035) Redis as State Backend

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15140206#comment-15140206
 ] 

ASF GitHub Bot commented on FLINK-3035:
---

GitHub user sbcd90 opened a pull request:

https://github.com/apache/flink/pull/1617

[FLINK-3035] Redis as State Backend

@mjsax please review.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sbcd90/flink FLINK-3035

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1617.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1617


commit 5a4b2f09e6990185ca6cdf3d91a4561dcb23098b
Author: Subhobrata Dey 
Date:   2016-02-10T01:33:13Z

[FLINK-3035] Redis as State Backend




> Redis as State Backend
> --
>
> Key: FLINK-3035
> URL: https://issues.apache.org/jira/browse/FLINK-3035
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Matthias J. Sax
>Assignee: Subhobrata Dey
>Priority: Minor
>
> Add Redis as a state backend for distributed snapshots.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2991) Extend Window Operators to Allow Efficient Fold Operation

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139965#comment-15139965
 ] 

ASF GitHub Bot commented on FLINK-2991:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1605#issuecomment-182125713
  
You are adding new `.apply()` variants to the API which are not documented 
with the PR.


> Extend Window Operators to Allow Efficient Fold Operation
> -
>
> Key: FLINK-2991
> URL: https://issues.apache.org/jira/browse/FLINK-2991
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> Right now, a window fold is implemented as a WindowFunction that gets all the 
> elements as input. No pre-aggregation is performed. The window operator 
> should be extended to also allow the fold to also be pre-aggregated.
> This requires changing the signature of the {{WindowBuffer}} so that it can 
> emit a type other than the input type. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2991] Add Folding State and use in Wind...

2016-02-09 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1605#issuecomment-182125713
  
You are adding new `.apply()` variants to the API which are not documented 
with the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2523) Make task canceling interrupt interval configurable

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139953#comment-15139953
 ] 

ASF GitHub Bot commented on FLINK-2523:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/1612#discussion_r52388927
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -102,60 +101,150 @@
// 

 
/**
-* Constructs a new job graph with no name and a random job ID.
+* Constructs a new job graph with no name, a random job ID, and the 
default
+* {@link ExecutionConfig} parameters.
 */
public JobGraph() {
this((String) null);
}
 
/**
-* Constructs a new job graph with the given name, a random job ID.
+* Constructs a new job graph with the given name, a random job ID and 
the default
+* {@link ExecutionConfig} parameters.
 *
 * @param jobName The name of the job
 */
public JobGraph(String jobName) {
-   this(null, jobName);
+   this(null, jobName, new ExecutionConfig());
+   }
+
+   /**
+* Constructs a new job graph with the given job ID, the given name, 
and the default
+* {@link ExecutionConfig} parameters.
+*
+* @param jobID The id of the job. A random ID is generated, if {@code 
null} is passed.
+* @param jobName The name of the job.
+*/
+   public JobGraph(JobID jobID, String jobName) {
+   this(jobID, jobName, new ExecutionConfig());
}
 
/**
-* Constructs a new job graph with the given name and a random job ID 
if null supplied as an id.
+* Constructs a new job graph with the given name, the given {@link 
ExecutionConfig},
+* and a random job ID.
+*
+* @param jobName The name of the job.
+* @param config The execution configuration of the job.
+*/
+   public JobGraph(String jobName, ExecutionConfig config) {
+   this(null, jobName, config);
+   }
+
+   /**
+* Constructs a new job graph with the given job ID (or a random ID, if 
{@code null} is passed),
+* the given name and the given execution configuration (see {@link 
ExecutionConfig}).
 *
 * @param jobId The id of the job. A random ID is generated, if {@code 
null} is passed.
 * @param jobName The name of the job.
+* @param config The execution configuration of the job.
 */
-   public JobGraph(JobID jobId, String jobName) {
+   public JobGraph(JobID jobId, String jobName, ExecutionConfig config) {
this.jobID = jobId == null ? new JobID() : jobId;
this.jobName = jobName == null ? "(unnamed job)" : jobName;
+   this.executionConfig = config;
--- End diff --

It cannot, I just added a check.


> Make task canceling interrupt interval configurable
> ---
>
> Key: FLINK-2523
> URL: https://issues.apache.org/jira/browse/FLINK-2523
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Klou
> Fix For: 1.0.0
>
>
> When a task is canceled, the cancellation calls periodically "interrupt()" on 
> the task thread, if the task thread does not cancel with a certain time.
> Currently, this value is hard coded to 10 seconds. We should make that time 
> configurable.
> Until then, I would like to increase the value to 30 seconds, as many tasks 
> (here I am observing it for Kafka consumers) can take longer then 10 seconds 
> for proper cleanup.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: FLINK-2523: Makes the task cancellation interv...

2016-02-09 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/1612#discussion_r52388927
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -102,60 +101,150 @@
// 

 
/**
-* Constructs a new job graph with no name and a random job ID.
+* Constructs a new job graph with no name, a random job ID, and the 
default
+* {@link ExecutionConfig} parameters.
 */
public JobGraph() {
this((String) null);
}
 
/**
-* Constructs a new job graph with the given name, a random job ID.
+* Constructs a new job graph with the given name, a random job ID and 
the default
+* {@link ExecutionConfig} parameters.
 *
 * @param jobName The name of the job
 */
public JobGraph(String jobName) {
-   this(null, jobName);
+   this(null, jobName, new ExecutionConfig());
+   }
+
+   /**
+* Constructs a new job graph with the given job ID, the given name, 
and the default
+* {@link ExecutionConfig} parameters.
+*
+* @param jobID The id of the job. A random ID is generated, if {@code 
null} is passed.
+* @param jobName The name of the job.
+*/
+   public JobGraph(JobID jobID, String jobName) {
+   this(jobID, jobName, new ExecutionConfig());
}
 
/**
-* Constructs a new job graph with the given name and a random job ID 
if null supplied as an id.
+* Constructs a new job graph with the given name, the given {@link 
ExecutionConfig},
+* and a random job ID.
+*
+* @param jobName The name of the job.
+* @param config The execution configuration of the job.
+*/
+   public JobGraph(String jobName, ExecutionConfig config) {
+   this(null, jobName, config);
+   }
+
+   /**
+* Constructs a new job graph with the given job ID (or a random ID, if 
{@code null} is passed),
+* the given name and the given execution configuration (see {@link 
ExecutionConfig}).
 *
 * @param jobId The id of the job. A random ID is generated, if {@code 
null} is passed.
 * @param jobName The name of the job.
+* @param config The execution configuration of the job.
 */
-   public JobGraph(JobID jobId, String jobName) {
+   public JobGraph(JobID jobId, String jobName, ExecutionConfig config) {
this.jobID = jobId == null ? new JobID() : jobId;
this.jobName = jobName == null ? "(unnamed job)" : jobName;
+   this.executionConfig = config;
--- End diff --

It cannot, I just added a check.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-2523: Makes the task cancellation interv...

2016-02-09 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/1612#discussion_r52388853
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -102,60 +101,150 @@
// 

 
/**
-* Constructs a new job graph with no name and a random job ID.
+* Constructs a new job graph with no name, a random job ID, and the 
default
+* {@link ExecutionConfig} parameters.
 */
public JobGraph() {
this((String) null);
}
 
/**
-* Constructs a new job graph with the given name, a random job ID.
+* Constructs a new job graph with the given name, a random job ID and 
the default
+* {@link ExecutionConfig} parameters.
 *
 * @param jobName The name of the job
 */
public JobGraph(String jobName) {
-   this(null, jobName);
+   this(null, jobName, new ExecutionConfig());
+   }
+
+   /**
+* Constructs a new job graph with the given job ID, the given name, 
and the default
+* {@link ExecutionConfig} parameters.
+*
+* @param jobID The id of the job. A random ID is generated, if {@code 
null} is passed.
+* @param jobName The name of the job.
+*/
+   public JobGraph(JobID jobID, String jobName) {
+   this(jobID, jobName, new ExecutionConfig());
--- End diff --

Same as before.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2523) Make task canceling interrupt interval configurable

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139951#comment-15139951
 ] 

ASF GitHub Bot commented on FLINK-2523:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/1612#discussion_r52388853
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -102,60 +101,150 @@
// 

 
/**
-* Constructs a new job graph with no name and a random job ID.
+* Constructs a new job graph with no name, a random job ID, and the 
default
+* {@link ExecutionConfig} parameters.
 */
public JobGraph() {
this((String) null);
}
 
/**
-* Constructs a new job graph with the given name, a random job ID.
+* Constructs a new job graph with the given name, a random job ID and 
the default
+* {@link ExecutionConfig} parameters.
 *
 * @param jobName The name of the job
 */
public JobGraph(String jobName) {
-   this(null, jobName);
+   this(null, jobName, new ExecutionConfig());
+   }
+
+   /**
+* Constructs a new job graph with the given job ID, the given name, 
and the default
+* {@link ExecutionConfig} parameters.
+*
+* @param jobID The id of the job. A random ID is generated, if {@code 
null} is passed.
+* @param jobName The name of the job.
+*/
+   public JobGraph(JobID jobID, String jobName) {
+   this(jobID, jobName, new ExecutionConfig());
--- End diff --

Same as before.


> Make task canceling interrupt interval configurable
> ---
>
> Key: FLINK-2523
> URL: https://issues.apache.org/jira/browse/FLINK-2523
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Klou
> Fix For: 1.0.0
>
>
> When a task is canceled, the cancellation calls periodically "interrupt()" on 
> the task thread, if the task thread does not cancel with a certain time.
> Currently, this value is hard coded to 10 seconds. We should make that time 
> configurable.
> Until then, I would like to increase the value to 30 seconds, as many tasks 
> (here I am observing it for Kafka consumers) can take longer then 10 seconds 
> for proper cleanup.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2523) Make task canceling interrupt interval configurable

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139949#comment-15139949
 ] 

ASF GitHub Bot commented on FLINK-2523:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/1612#discussion_r52388830
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -102,60 +101,150 @@
// 

 
/**
-* Constructs a new job graph with no name and a random job ID.
+* Constructs a new job graph with no name, a random job ID, and the 
default
+* {@link ExecutionConfig} parameters.
 */
public JobGraph() {
this((String) null);
}
 
/**
-* Constructs a new job graph with the given name, a random job ID.
+* Constructs a new job graph with the given name, a random job ID and 
the default
+* {@link ExecutionConfig} parameters.
 *
 * @param jobName The name of the job
 */
public JobGraph(String jobName) {
-   this(null, jobName);
+   this(null, jobName, new ExecutionConfig());
--- End diff --

Again here the constructors that do not specify executionConfigs are only 
exists for tests. So here the empty ExecutionConfig is just to not break the 
already existing tests that were testing other parts of the functionality.


> Make task canceling interrupt interval configurable
> ---
>
> Key: FLINK-2523
> URL: https://issues.apache.org/jira/browse/FLINK-2523
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Klou
> Fix For: 1.0.0
>
>
> When a task is canceled, the cancellation calls periodically "interrupt()" on 
> the task thread, if the task thread does not cancel with a certain time.
> Currently, this value is hard coded to 10 seconds. We should make that time 
> configurable.
> Until then, I would like to increase the value to 30 seconds, as many tasks 
> (here I am observing it for Kafka consumers) can take longer then 10 seconds 
> for proper cleanup.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: FLINK-2523: Makes the task cancellation interv...

2016-02-09 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/1612#discussion_r52388830
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -102,60 +101,150 @@
// 

 
/**
-* Constructs a new job graph with no name and a random job ID.
+* Constructs a new job graph with no name, a random job ID, and the 
default
+* {@link ExecutionConfig} parameters.
 */
public JobGraph() {
this((String) null);
}
 
/**
-* Constructs a new job graph with the given name, a random job ID.
+* Constructs a new job graph with the given name, a random job ID and 
the default
+* {@link ExecutionConfig} parameters.
 *
 * @param jobName The name of the job
 */
public JobGraph(String jobName) {
-   this(null, jobName);
+   this(null, jobName, new ExecutionConfig());
--- End diff --

Again here the constructors that do not specify executionConfigs are only 
exists for tests. So here the empty ExecutionConfig is just to not break the 
already existing tests that were testing other parts of the functionality.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2523) Make task canceling interrupt interval configurable

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139942#comment-15139942
 ] 

ASF GitHub Bot commented on FLINK-2523:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/1612#discussion_r52388350
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
 ---
@@ -141,9 +146,16 @@ public TaskDeploymentDescriptor(
List requiredJarFiles, List 
requiredClasspaths,
int targetSlotNumber) {
 
-   this(appId, jobID, vertexID, executionId, taskName, 
indexInSubtaskGroup, numberOfSubtasks, attemptNumber,
-   jobConfiguration, taskConfiguration, 
invokableClassName, producedPartitions,
-   inputGates, requiredJarFiles, 
requiredClasspaths, targetSlotNumber, null, -1);
+   this(appId, jobID, vertexID, executionId, new 
ExecutionConfig(), taskName, indexInSubtaskGroup,
--- End diff --

This constructor only exists for tests. The executionConfig in the TDD is 
the change of this pull request, so here the new ExecutionConfig is just to not 
break the already existing tests that were testing other parts of the 
functionality of the TDD. New tests are added to test the new functionality.


> Make task canceling interrupt interval configurable
> ---
>
> Key: FLINK-2523
> URL: https://issues.apache.org/jira/browse/FLINK-2523
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Klou
> Fix For: 1.0.0
>
>
> When a task is canceled, the cancellation calls periodically "interrupt()" on 
> the task thread, if the task thread does not cancel with a certain time.
> Currently, this value is hard coded to 10 seconds. We should make that time 
> configurable.
> Until then, I would like to increase the value to 30 seconds, as many tasks 
> (here I am observing it for Kafka consumers) can take longer then 10 seconds 
> for proper cleanup.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: FLINK-2523: Makes the task cancellation interv...

2016-02-09 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/1612#discussion_r52388350
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
 ---
@@ -141,9 +146,16 @@ public TaskDeploymentDescriptor(
List requiredJarFiles, List 
requiredClasspaths,
int targetSlotNumber) {
 
-   this(appId, jobID, vertexID, executionId, taskName, 
indexInSubtaskGroup, numberOfSubtasks, attemptNumber,
-   jobConfiguration, taskConfiguration, 
invokableClassName, producedPartitions,
-   inputGates, requiredJarFiles, 
requiredClasspaths, targetSlotNumber, null, -1);
+   this(appId, jobID, vertexID, executionId, new 
ExecutionConfig(), taskName, indexInSubtaskGroup,
--- End diff --

This constructor only exists for tests. The executionConfig in the TDD is 
the change of this pull request, so here the new ExecutionConfig is just to not 
break the already existing tests that were testing other parts of the 
functionality of the TDD. New tests are added to test the new functionality.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2991] Add Folding State and use in Wind...

2016-02-09 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1605#discussion_r52388115
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+import org.rocksdb.RocksDBException;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * {@link ReducingState} implementation that stores state in RocksDB.
+ *
+ * @param  The type of the key.
+ * @param  The type of the namespace.
+ * @param  The type of the values that can be folded into the state.
+ * @param  The type of the value in the folding state.
+ * @param  The type of the backend that snapshots this key/value 
state.
+ */
+public class RocksDBFoldingState
+   extends AbstractRocksDBState, 
FoldingStateDescriptor, Backend>
+   implements FoldingState {
+
+   /** Serializer for the values */
+   private final TypeSerializer valueSerializer;
+
+   /** This holds the name of the state and can create an initial default 
value for the state. */
+   protected final FoldingStateDescriptor stateDesc;
+
+   /** User-specified fold function */
+   private final FoldFunction foldFunction;
+
+   /**
+* Creates a new {@code RocksDBFoldingState}.
+*
+* @param keySerializer The serializer for the keys.
+* @param namespaceSerializer The serializer for the namespace.
+* @param stateDesc The state identifier for the state. This contains 
name
+*   and can create a default state value.
+* @param dbPath The path on the local system where RocksDB data should 
be stored.
+*/
+   protected RocksDBFoldingState(TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   FoldingStateDescriptor stateDesc,
+   File dbPath,
+   String backupPath) {
+   super(keySerializer, namespaceSerializer, dbPath, backupPath);
+   this.stateDesc = requireNonNull(stateDesc);
+   this.valueSerializer = stateDesc.getSerializer();
+   this.foldFunction = stateDesc.getFoldFunction();
+   }
+
+   protected RocksDBFoldingState(TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   FoldingStateDescriptor stateDesc,
+   File dbPath,
+   String backupPath,
+   String restorePath) {
+   super(keySerializer, namespaceSerializer, dbPath, backupPath, 
restorePath);
+   this.stateDesc = stateDesc;
+   this.valueSerializer = stateDesc.getSerializer();
+   this.foldFunction = stateDesc.getFoldFunction();
+   }
+
+   @Override
+   public ACC get() {
+   ByteArrayOutputStream baos = new ByteArrayOutputStream();
+   DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(baos);
+   try {
+   writeKeyAndNamespace(out);
+   byte[] key = baos.toByteArray();
+   

[jira] [Commented] (FLINK-2991) Extend Window Operators to Allow Efficient Fold Operation

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139939#comment-15139939
 ] 

ASF GitHub Bot commented on FLINK-2991:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1605#discussion_r52388115
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+import org.rocksdb.RocksDBException;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * {@link ReducingState} implementation that stores state in RocksDB.
+ *
+ * @param  The type of the key.
+ * @param  The type of the namespace.
+ * @param  The type of the values that can be folded into the state.
+ * @param  The type of the value in the folding state.
+ * @param  The type of the backend that snapshots this key/value 
state.
+ */
+public class RocksDBFoldingState
+   extends AbstractRocksDBState, 
FoldingStateDescriptor, Backend>
+   implements FoldingState {
+
+   /** Serializer for the values */
+   private final TypeSerializer valueSerializer;
+
+   /** This holds the name of the state and can create an initial default 
value for the state. */
+   protected final FoldingStateDescriptor stateDesc;
+
+   /** User-specified fold function */
+   private final FoldFunction foldFunction;
+
+   /**
+* Creates a new {@code RocksDBFoldingState}.
+*
+* @param keySerializer The serializer for the keys.
+* @param namespaceSerializer The serializer for the namespace.
+* @param stateDesc The state identifier for the state. This contains 
name
+*   and can create a default state value.
+* @param dbPath The path on the local system where RocksDB data should 
be stored.
+*/
+   protected RocksDBFoldingState(TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   FoldingStateDescriptor stateDesc,
+   File dbPath,
+   String backupPath) {
+   super(keySerializer, namespaceSerializer, dbPath, backupPath);
+   this.stateDesc = requireNonNull(stateDesc);
+   this.valueSerializer = stateDesc.getSerializer();
+   this.foldFunction = stateDesc.getFoldFunction();
+   }
+
+   protected RocksDBFoldingState(TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   FoldingStateDescriptor stateDesc,
+   File dbPath,
+   String backupPath,
+   String restorePath) {
+   super(keySerializer, namespaceSerializer, dbPath, backupPath, 
restorePath);
+   this.stateDesc = stateDesc;
+   this.valueSerializer = stateDesc.getSerializer();
+   this.foldFunction = stateDesc.getFoldFunction();
+   }
+
+   @Override
+   public ACC get() {
+   ByteArrayOutputStream baos = new ByteArrayOutp

[jira] [Commented] (FLINK-3366) Rename @Experimental annotation to @PublicEvolving

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139929#comment-15139929
 ] 

ASF GitHub Bot commented on FLINK-3366:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1599#issuecomment-182115171
  
+1 to merge


> Rename @Experimental annotation to @PublicEvolving
> --
>
> Key: FLINK-3366
> URL: https://issues.apache.org/jira/browse/FLINK-3366
> Project: Flink
>  Issue Type: Task
>Affects Versions: 1.0.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.0.0
>
>
> As per discussion on the dev ML, rename the @Experimental annotation to 
> @PublicEvolving.
> Experimental might suggest instable / unreliable functionality which is not 
> the intended meaning of this annotation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3366] Rename @Experimental annotation t...

2016-02-09 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1599#issuecomment-182115171
  
+1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3372) Setting custom YARN application name is ignored

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139922#comment-15139922
 ] 

ASF GitHub Bot commented on FLINK-3372:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1607#issuecomment-182114169
  
+1 to merge


> Setting custom YARN application name is ignored
> ---
>
> Key: FLINK-3372
> URL: https://issues.apache.org/jira/browse/FLINK-3372
> Project: Flink
>  Issue Type: Bug
>  Components: YARN Client
>Affects Versions: 0.10.1
>Reporter: Nick Dimiduk
>
> The {{-ynm}} optional argument is ignored. From my debugging 
> FlinkYarnClientBase does the right thing to parse and set the value. 
> CliFrontend ignores this parsed value, always calling
> {noformat}
> flinkYarnClient.setName("Flink Application: " + programName);
> {noformat}
> down in {{getClient(CommandLineOptions, String, int)}}. Thus every job 
> submission to YARN is identifiable only by its classname.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3372] Setting custom YARN application n...

2016-02-09 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1607#issuecomment-182114169
  
+1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-2523: Makes the task cancellation interv...

2016-02-09 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1612#discussion_r52385698
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
 ---
@@ -141,9 +146,16 @@ public TaskDeploymentDescriptor(
List requiredJarFiles, List 
requiredClasspaths,
int targetSlotNumber) {
 
-   this(appId, jobID, vertexID, executionId, taskName, 
indexInSubtaskGroup, numberOfSubtasks, attemptNumber,
-   jobConfiguration, taskConfiguration, 
invokableClassName, producedPartitions,
-   inputGates, requiredJarFiles, 
requiredClasspaths, targetSlotNumber, null, -1);
+   this(appId, jobID, vertexID, executionId, new 
ExecutionConfig(), taskName, indexInSubtaskGroup,
--- End diff --

why is there a constructor variant of the TDD that creates an empty 
execution config?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-2523: Makes the task cancellation interv...

2016-02-09 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1612#discussion_r52385188
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 ---
@@ -75,16 +76,22 @@ public void testExecutionConfigSerialization() throws 
IOException, ClassNotFound
config.disableSysoutLogging();
}
config.setParallelism(dop);
-   
+
JobGraph jobGraph = compiler.createJobGraph("test");
-   
+
+   final String exec_config_key = "runtime.execconfig";
--- End diff --

variable name is not java style


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2523) Make task canceling interrupt interval configurable

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139913#comment-15139913
 ] 

ASF GitHub Bot commented on FLINK-2523:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1612#discussion_r52386582
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -102,60 +101,150 @@
// 

 
/**
-* Constructs a new job graph with no name and a random job ID.
+* Constructs a new job graph with no name, a random job ID, and the 
default
+* {@link ExecutionConfig} parameters.
 */
public JobGraph() {
this((String) null);
}
 
/**
-* Constructs a new job graph with the given name, a random job ID.
+* Constructs a new job graph with the given name, a random job ID and 
the default
+* {@link ExecutionConfig} parameters.
 *
 * @param jobName The name of the job
 */
public JobGraph(String jobName) {
-   this(null, jobName);
+   this(null, jobName, new ExecutionConfig());
+   }
+
+   /**
+* Constructs a new job graph with the given job ID, the given name, 
and the default
+* {@link ExecutionConfig} parameters.
+*
+* @param jobID The id of the job. A random ID is generated, if {@code 
null} is passed.
+* @param jobName The name of the job.
+*/
+   public JobGraph(JobID jobID, String jobName) {
+   this(jobID, jobName, new ExecutionConfig());
--- End diff --

same Q here


> Make task canceling interrupt interval configurable
> ---
>
> Key: FLINK-2523
> URL: https://issues.apache.org/jira/browse/FLINK-2523
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Klou
> Fix For: 1.0.0
>
>
> When a task is canceled, the cancellation calls periodically "interrupt()" on 
> the task thread, if the task thread does not cancel with a certain time.
> Currently, this value is hard coded to 10 seconds. We should make that time 
> configurable.
> Until then, I would like to increase the value to 30 seconds, as many tasks 
> (here I am observing it for Kafka consumers) can take longer then 10 seconds 
> for proper cleanup.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: FLINK-2523: Makes the task cancellation interv...

2016-02-09 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1612#discussion_r52386295
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -102,60 +101,150 @@
// 

 
/**
-* Constructs a new job graph with no name and a random job ID.
+* Constructs a new job graph with no name, a random job ID, and the 
default
+* {@link ExecutionConfig} parameters.
 */
public JobGraph() {
this((String) null);
}
 
/**
-* Constructs a new job graph with the given name, a random job ID.
+* Constructs a new job graph with the given name, a random job ID and 
the default
+* {@link ExecutionConfig} parameters.
 *
 * @param jobName The name of the job
 */
public JobGraph(String jobName) {
-   this(null, jobName);
+   this(null, jobName, new ExecutionConfig());
--- End diff --

Why are you creating an empty execution config here?

Is this the execution config which will be passed to the user in the end?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-2523: Makes the task cancellation interv...

2016-02-09 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1612#discussion_r52385934
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -102,60 +101,150 @@
// 

 
/**
-* Constructs a new job graph with no name and a random job ID.
+* Constructs a new job graph with no name, a random job ID, and the 
default
+* {@link ExecutionConfig} parameters.
 */
public JobGraph() {
this((String) null);
}
 
/**
-* Constructs a new job graph with the given name, a random job ID.
+* Constructs a new job graph with the given name, a random job ID and 
the default
+* {@link ExecutionConfig} parameters.
 *
 * @param jobName The name of the job
 */
public JobGraph(String jobName) {
-   this(null, jobName);
+   this(null, jobName, new ExecutionConfig());
+   }
+
+   /**
+* Constructs a new job graph with the given job ID, the given name, 
and the default
+* {@link ExecutionConfig} parameters.
+*
+* @param jobID The id of the job. A random ID is generated, if {@code 
null} is passed.
+* @param jobName The name of the job.
+*/
+   public JobGraph(JobID jobID, String jobName) {
+   this(jobID, jobName, new ExecutionConfig());
}
 
/**
-* Constructs a new job graph with the given name and a random job ID 
if null supplied as an id.
+* Constructs a new job graph with the given name, the given {@link 
ExecutionConfig},
+* and a random job ID.
+*
+* @param jobName The name of the job.
+* @param config The execution configuration of the job.
+*/
+   public JobGraph(String jobName, ExecutionConfig config) {
+   this(null, jobName, config);
+   }
+
+   /**
+* Constructs a new job graph with the given job ID (or a random ID, if 
{@code null} is passed),
+* the given name and the given execution configuration (see {@link 
ExecutionConfig}).
 *
 * @param jobId The id of the job. A random ID is generated, if {@code 
null} is passed.
 * @param jobName The name of the job.
+* @param config The execution configuration of the job.
 */
-   public JobGraph(JobID jobId, String jobName) {
+   public JobGraph(JobID jobId, String jobName, ExecutionConfig config) {
this.jobID = jobId == null ? new JobID() : jobId;
this.jobName = jobName == null ? "(unnamed job)" : jobName;
+   this.executionConfig = config;
+   }
+
+   /**
+* Constructs a new job graph containing the provided single 
{@code JobVertex} (see {@link JobVertex}),
+* with no name, a random ID, and the default execution configuration 
(see {@link ExecutionConfig}).
+*
+* @param vertex The single vertex of the graph.
+* */
+   public JobGraph(JobVertex vertex) {
+   this(null, Collections.singletonList(vertex));
+   }
+
+   /**
+* Constructs a new job graph containing the provided single 
{@code JobVertex} (see {@link JobVertex}),
+* with the given name, a random ID, and the default execution 
configuration (see {@link ExecutionConfig}).
+*
+* @param jobName The name of the job.
+* @param vertex The single vertex of the graph.
+* */
+   public JobGraph(String jobName, JobVertex vertex) {
+   this(jobName, Collections.singletonList(vertex));
+   }
+
+   /**
+* Constructs a new job graph containing the provided two {@code 
JobVertices} (see {@link JobVertex}),
+* with the given name, a random ID, and the default execution 
configuration (see {@link ExecutionConfig}).
+*
+* @param jobName The name of the job.
+* @param vertex1 The first vertex of the graph.
+* @param vertex2 The second vertex of the graph.
+* */
+   public JobGraph(String jobName, JobVertex vertex1, JobVertex vertex2) {
+   this(jobName, Arrays.asList(vertex1, vertex2));
}
 
/**
-* Constructs a new job graph with no name and a random job ID if null 
supplied as an id.
+* Constructs a new job graph containing the provided {@code 
JobVertices} (see {@link JobVertex}),
+* with no name, a random job ID, and the default execution 
configuration (see {@link ExecutionConfig}).
 *
 * @param vertices The vertices to add to the graph.
 */
-   public JobGraph(JobVertex... vertices) {
--- End diff --

Why did you remove the varargs constructor?


---
If your project is set up for it, you can 

[GitHub] flink pull request: FLINK-2523: Makes the task cancellation interv...

2016-02-09 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1612#discussion_r52386582
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -102,60 +101,150 @@
// 

 
/**
-* Constructs a new job graph with no name and a random job ID.
+* Constructs a new job graph with no name, a random job ID, and the 
default
+* {@link ExecutionConfig} parameters.
 */
public JobGraph() {
this((String) null);
}
 
/**
-* Constructs a new job graph with the given name, a random job ID.
+* Constructs a new job graph with the given name, a random job ID and 
the default
+* {@link ExecutionConfig} parameters.
 *
 * @param jobName The name of the job
 */
public JobGraph(String jobName) {
-   this(null, jobName);
+   this(null, jobName, new ExecutionConfig());
+   }
+
+   /**
+* Constructs a new job graph with the given job ID, the given name, 
and the default
+* {@link ExecutionConfig} parameters.
+*
+* @param jobID The id of the job. A random ID is generated, if {@code 
null} is passed.
+* @param jobName The name of the job.
+*/
+   public JobGraph(JobID jobID, String jobName) {
+   this(jobID, jobName, new ExecutionConfig());
--- End diff --

same Q here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-2523: Makes the task cancellation interv...

2016-02-09 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1612#discussion_r52385415
  
--- Diff: docs/apis/batch/index.md ---
@@ -2123,7 +2123,7 @@ Note that types registered with `registerKryoType()` 
are not available to Flink'
 
 - `disableAutoTypeRegistration()` Automatic type registration is enabled 
by default. The automatic type registration is registering all types (including 
sub-types) used by usercode with Kryo and the POJO serializer.
 
-
+- `setTaskCancellationInterval(long interval)` Sets the the interval (in 
milliseconds) to wait between consecutive attempts to cancel a running task. By 
default this is set to **3** milliseconds, or **30 seconds**.
--- End diff --

I would explain a bit more detailed what's going on. The `cancel()` method 
is called only once. The interval determines the interrupt frequency! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-2523: Makes the task cancellation interv...

2016-02-09 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1612#discussion_r52385223
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 ---
@@ -27,13 +24,17 @@
 import org.apache.flink.streaming.api.datastream.DataStream;
 import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.util.InstantiationUtil;
 
+import org.apache.flink.util.InstantiationUtil;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Random;
+
 import static org.junit.Assert.*;
 
-public class StreamingJobGraphGeneratorTest {
+public class
+StreamingJobGraphGeneratorTest {
--- End diff --

why is there a line break after the `class`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2523) Make task canceling interrupt interval configurable

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139908#comment-15139908
 ] 

ASF GitHub Bot commented on FLINK-2523:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1612#discussion_r52386295
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -102,60 +101,150 @@
// 

 
/**
-* Constructs a new job graph with no name and a random job ID.
+* Constructs a new job graph with no name, a random job ID, and the 
default
+* {@link ExecutionConfig} parameters.
 */
public JobGraph() {
this((String) null);
}
 
/**
-* Constructs a new job graph with the given name, a random job ID.
+* Constructs a new job graph with the given name, a random job ID and 
the default
+* {@link ExecutionConfig} parameters.
 *
 * @param jobName The name of the job
 */
public JobGraph(String jobName) {
-   this(null, jobName);
+   this(null, jobName, new ExecutionConfig());
--- End diff --

Why are you creating an empty execution config here?

Is this the execution config which will be passed to the user in the end?


> Make task canceling interrupt interval configurable
> ---
>
> Key: FLINK-2523
> URL: https://issues.apache.org/jira/browse/FLINK-2523
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Klou
> Fix For: 1.0.0
>
>
> When a task is canceled, the cancellation calls periodically "interrupt()" on 
> the task thread, if the task thread does not cancel with a certain time.
> Currently, this value is hard coded to 10 seconds. We should make that time 
> configurable.
> Until then, I would like to increase the value to 30 seconds, as many tasks 
> (here I am observing it for Kafka consumers) can take longer then 10 seconds 
> for proper cleanup.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2523) Make task canceling interrupt interval configurable

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139897#comment-15139897
 ] 

ASF GitHub Bot commented on FLINK-2523:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1612#discussion_r52385223
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 ---
@@ -27,13 +24,17 @@
 import org.apache.flink.streaming.api.datastream.DataStream;
 import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.util.InstantiationUtil;
 
+import org.apache.flink.util.InstantiationUtil;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Random;
+
 import static org.junit.Assert.*;
 
-public class StreamingJobGraphGeneratorTest {
+public class
+StreamingJobGraphGeneratorTest {
--- End diff --

why is there a line break after the `class`?


> Make task canceling interrupt interval configurable
> ---
>
> Key: FLINK-2523
> URL: https://issues.apache.org/jira/browse/FLINK-2523
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Klou
> Fix For: 1.0.0
>
>
> When a task is canceled, the cancellation calls periodically "interrupt()" on 
> the task thread, if the task thread does not cancel with a certain time.
> Currently, this value is hard coded to 10 seconds. We should make that time 
> configurable.
> Until then, I would like to increase the value to 30 seconds, as many tasks 
> (here I am observing it for Kafka consumers) can take longer then 10 seconds 
> for proper cleanup.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2523) Make task canceling interrupt interval configurable

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139903#comment-15139903
 ] 

ASF GitHub Bot commented on FLINK-2523:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1612#discussion_r52385698
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
 ---
@@ -141,9 +146,16 @@ public TaskDeploymentDescriptor(
List requiredJarFiles, List 
requiredClasspaths,
int targetSlotNumber) {
 
-   this(appId, jobID, vertexID, executionId, taskName, 
indexInSubtaskGroup, numberOfSubtasks, attemptNumber,
-   jobConfiguration, taskConfiguration, 
invokableClassName, producedPartitions,
-   inputGates, requiredJarFiles, 
requiredClasspaths, targetSlotNumber, null, -1);
+   this(appId, jobID, vertexID, executionId, new 
ExecutionConfig(), taskName, indexInSubtaskGroup,
--- End diff --

why is there a constructor variant of the TDD that creates an empty 
execution config?


> Make task canceling interrupt interval configurable
> ---
>
> Key: FLINK-2523
> URL: https://issues.apache.org/jira/browse/FLINK-2523
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Klou
> Fix For: 1.0.0
>
>
> When a task is canceled, the cancellation calls periodically "interrupt()" on 
> the task thread, if the task thread does not cancel with a certain time.
> Currently, this value is hard coded to 10 seconds. We should make that time 
> configurable.
> Until then, I would like to increase the value to 30 seconds, as many tasks 
> (here I am observing it for Kafka consumers) can take longer then 10 seconds 
> for proper cleanup.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2523) Make task canceling interrupt interval configurable

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139899#comment-15139899
 ] 

ASF GitHub Bot commented on FLINK-2523:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1612#discussion_r52385415
  
--- Diff: docs/apis/batch/index.md ---
@@ -2123,7 +2123,7 @@ Note that types registered with `registerKryoType()` 
are not available to Flink'
 
 - `disableAutoTypeRegistration()` Automatic type registration is enabled 
by default. The automatic type registration is registering all types (including 
sub-types) used by usercode with Kryo and the POJO serializer.
 
-
+- `setTaskCancellationInterval(long interval)` Sets the the interval (in 
milliseconds) to wait between consecutive attempts to cancel a running task. By 
default this is set to **3** milliseconds, or **30 seconds**.
--- End diff --

I would explain a bit more detailed what's going on. The `cancel()` method 
is called only once. The interval determines the interrupt frequency! 


> Make task canceling interrupt interval configurable
> ---
>
> Key: FLINK-2523
> URL: https://issues.apache.org/jira/browse/FLINK-2523
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Klou
> Fix For: 1.0.0
>
>
> When a task is canceled, the cancellation calls periodically "interrupt()" on 
> the task thread, if the task thread does not cancel with a certain time.
> Currently, this value is hard coded to 10 seconds. We should make that time 
> configurable.
> Until then, I would like to increase the value to 30 seconds, as many tasks 
> (here I am observing it for Kafka consumers) can take longer then 10 seconds 
> for proper cleanup.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2523) Make task canceling interrupt interval configurable

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139904#comment-15139904
 ] 

ASF GitHub Bot commented on FLINK-2523:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1612#discussion_r52385934
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -102,60 +101,150 @@
// 

 
/**
-* Constructs a new job graph with no name and a random job ID.
+* Constructs a new job graph with no name, a random job ID, and the 
default
+* {@link ExecutionConfig} parameters.
 */
public JobGraph() {
this((String) null);
}
 
/**
-* Constructs a new job graph with the given name, a random job ID.
+* Constructs a new job graph with the given name, a random job ID and 
the default
+* {@link ExecutionConfig} parameters.
 *
 * @param jobName The name of the job
 */
public JobGraph(String jobName) {
-   this(null, jobName);
+   this(null, jobName, new ExecutionConfig());
+   }
+
+   /**
+* Constructs a new job graph with the given job ID, the given name, 
and the default
+* {@link ExecutionConfig} parameters.
+*
+* @param jobID The id of the job. A random ID is generated, if {@code 
null} is passed.
+* @param jobName The name of the job.
+*/
+   public JobGraph(JobID jobID, String jobName) {
+   this(jobID, jobName, new ExecutionConfig());
}
 
/**
-* Constructs a new job graph with the given name and a random job ID 
if null supplied as an id.
+* Constructs a new job graph with the given name, the given {@link 
ExecutionConfig},
+* and a random job ID.
+*
+* @param jobName The name of the job.
+* @param config The execution configuration of the job.
+*/
+   public JobGraph(String jobName, ExecutionConfig config) {
+   this(null, jobName, config);
+   }
+
+   /**
+* Constructs a new job graph with the given job ID (or a random ID, if 
{@code null} is passed),
+* the given name and the given execution configuration (see {@link 
ExecutionConfig}).
 *
 * @param jobId The id of the job. A random ID is generated, if {@code 
null} is passed.
 * @param jobName The name of the job.
+* @param config The execution configuration of the job.
 */
-   public JobGraph(JobID jobId, String jobName) {
+   public JobGraph(JobID jobId, String jobName, ExecutionConfig config) {
this.jobID = jobId == null ? new JobID() : jobId;
this.jobName = jobName == null ? "(unnamed job)" : jobName;
+   this.executionConfig = config;
+   }
+
+   /**
+* Constructs a new job graph containing the provided single 
{@code JobVertex} (see {@link JobVertex}),
+* with no name, a random ID, and the default execution configuration 
(see {@link ExecutionConfig}).
+*
+* @param vertex The single vertex of the graph.
+* */
+   public JobGraph(JobVertex vertex) {
+   this(null, Collections.singletonList(vertex));
+   }
+
+   /**
+* Constructs a new job graph containing the provided single 
{@code JobVertex} (see {@link JobVertex}),
+* with the given name, a random ID, and the default execution 
configuration (see {@link ExecutionConfig}).
+*
+* @param jobName The name of the job.
+* @param vertex The single vertex of the graph.
+* */
+   public JobGraph(String jobName, JobVertex vertex) {
+   this(jobName, Collections.singletonList(vertex));
+   }
+
+   /**
+* Constructs a new job graph containing the provided two {@code 
JobVertices} (see {@link JobVertex}),
+* with the given name, a random ID, and the default execution 
configuration (see {@link ExecutionConfig}).
+*
+* @param jobName The name of the job.
+* @param vertex1 The first vertex of the graph.
+* @param vertex2 The second vertex of the graph.
+* */
+   public JobGraph(String jobName, JobVertex vertex1, JobVertex vertex2) {
+   this(jobName, Arrays.asList(vertex1, vertex2));
}
 
/**
-* Constructs a new job graph with no name and a random job ID if null 
supplied as an id.
+* Constructs a new job graph containing the provided {@code 
JobVertices} (see {@link JobVertex}),
+* with no name, a random job ID, and the default execution 
configuration (see {@link ExecutionConfig})

[jira] [Commented] (FLINK-2523) Make task canceling interrupt interval configurable

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139895#comment-15139895
 ] 

ASF GitHub Bot commented on FLINK-2523:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1612#discussion_r52385188
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 ---
@@ -75,16 +76,22 @@ public void testExecutionConfigSerialization() throws 
IOException, ClassNotFound
config.disableSysoutLogging();
}
config.setParallelism(dop);
-   
+
JobGraph jobGraph = compiler.createJobGraph("test");
-   
+
+   final String exec_config_key = "runtime.execconfig";
--- End diff --

variable name is not java style


> Make task canceling interrupt interval configurable
> ---
>
> Key: FLINK-2523
> URL: https://issues.apache.org/jira/browse/FLINK-2523
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Klou
> Fix For: 1.0.0
>
>
> When a task is canceled, the cancellation calls periodically "interrupt()" on 
> the task thread, if the task thread does not cancel with a certain time.
> Currently, this value is hard coded to 10 seconds. We should make that time 
> configurable.
> Until then, I would like to increase the value to 30 seconds, as many tasks 
> (here I am observing it for Kafka consumers) can take longer then 10 seconds 
> for proper cleanup.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

2016-02-09 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1524#issuecomment-182054657
  
I'm testing the change on a cluster (with YARN) to see if everything is 
working as expected.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2380) Allow to configure default FS for file inputs

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139750#comment-15139750
 ] 

ASF GitHub Bot commented on FLINK-2380:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1524#issuecomment-182066864
  
Setting the value to `fs.default-scheme: thisIsWrong:///`

is good:

```
robert@cdh544-master:~/flink/build-target$ ./bin/flink run 
./examples/batch/WordCount.jar /user/robert/tpch-100lineitems.csv 
/user/robert/elasdoijwef


 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Cannot initialize task 'DataSink (CsvOutputFormat (path: 
/user/robert/elasdoijwef, delimiter:  ))': No file system found with scheme 
thisIsWrong, referenced in file URI 'thisIsWrong:/user/robert/elasdoijwef'.
at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
at 
org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:78)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at 
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:804)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:331)
at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1127)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1175)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot 
initialize task 'DataSink (CsvOutputFormat (path: /user/robert/elasdoijwef, 
delimiter:  ))': No file system found with scheme thisIsWrong, referenced in 
file URI 'thisIsWrong:/user/robert/elasdoijwef'.
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$6.apply(JobManager.scala:981)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$6.apply(JobManager.scala:965)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:965)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:378)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at 
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:105)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailb

[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

2016-02-09 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1524#issuecomment-182066864
  
Setting the value to `fs.default-scheme: thisIsWrong:///`

is good:

```
robert@cdh544-master:~/flink/build-target$ ./bin/flink run 
./examples/batch/WordCount.jar /user/robert/tpch-100lineitems.csv 
/user/robert/elasdoijwef


 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Cannot initialize task 'DataSink (CsvOutputFormat (path: 
/user/robert/elasdoijwef, delimiter:  ))': No file system found with scheme 
thisIsWrong, referenced in file URI 'thisIsWrong:/user/robert/elasdoijwef'.
at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
at 
org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:78)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at 
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:804)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:331)
at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1127)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1175)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot 
initialize task 'DataSink (CsvOutputFormat (path: /user/robert/elasdoijwef, 
delimiter:  ))': No file system found with scheme thisIsWrong, referenced in 
file URI 'thisIsWrong:/user/robert/elasdoijwef'.
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$6.apply(JobManager.scala:981)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$6.apply(JobManager.scala:965)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:965)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:378)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at 
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:105)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPoo

[jira] [Commented] (FLINK-3335) Fix DataSourceTask object reuse when disabled

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139749#comment-15139749
 ] 

ASF GitHub Bot commented on FLINK-3335:
---

GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/1616

[FLINK-3335] [runtime] Fix DataSourceTask object reuse when disabled

When object reuse is disabled, `DataSourceTask` now copies objects received 
from the `InputFormat` to prevent the collection of reused objects.

An example where this is necessary is a `DataSet` created from a user 
implementation of `Iterator` which reuses a local object returned from 
`Iterator.next`.

Also, when object reuse is enabled, the cycling among three objects has 
been removed. I had added this a few months ago when starting to resolve an 
issue with reduce drivers.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
3335_fix_datasourcetask_object_reuse_when_disabled

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1616.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1616


commit 2678b9315a28ce27d888c7be53e5cce13b1afb35
Author: Greg Hogan 
Date:   2016-02-09T13:18:28Z

[FLINK-3335] [runtime] Fix DataSourceTask object reuse when disabled

When object reuse is disabled, DataSourceTask now copies objects received 
from
the InputFormat to prevent the collection of reused objects.




> Fix DataSourceTask object reuse when disabled
> -
>
> Key: FLINK-3335
> URL: https://issues.apache.org/jira/browse/FLINK-3335
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Affects Versions: 1.0.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>
> From {{DataSourceTask.invoke()}}:
> {code}
> if ((returned = format.nextRecord(serializer.createInstance())) != null) {
> output.collect(returned);
> }
> {code}
> The returned value ({{returned}}) must be copied rather than creating and 
> passing in a new instance. The {{InputFormat}} interface only permits the 
> given object to be used and does not require a new object to be returned 
> otherwise.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3335] [runtime] Fix DataSourceTask obje...

2016-02-09 Thread greghogan
GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/1616

[FLINK-3335] [runtime] Fix DataSourceTask object reuse when disabled

When object reuse is disabled, `DataSourceTask` now copies objects received 
from the `InputFormat` to prevent the collection of reused objects.

An example where this is necessary is a `DataSet` created from a user 
implementation of `Iterator` which reuses a local object returned from 
`Iterator.next`.

Also, when object reuse is enabled, the cycling among three objects has 
been removed. I had added this a few months ago when starting to resolve an 
issue with reduce drivers.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
3335_fix_datasourcetask_object_reuse_when_disabled

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1616.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1616


commit 2678b9315a28ce27d888c7be53e5cce13b1afb35
Author: Greg Hogan 
Date:   2016-02-09T13:18:28Z

[FLINK-3335] [runtime] Fix DataSourceTask object reuse when disabled

When object reuse is disabled, DataSourceTask now copies objects received 
from
the InputFormat to prevent the collection of reused objects.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2380) Allow to configure default FS for file inputs

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139744#comment-15139744
 ] 

ASF GitHub Bot commented on FLINK-2380:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1524#issuecomment-182065768
  
I identified the following issues:

- Setting the configuration using the yarn session "dynamic properties": 
`./bin/yarn-session.sh -n 2 -Dfs.default-scheme=hdfs:///` does not really work 
(the configuration parameter shows up in the web interface, but the job fails)
- Setting a false schema leads to a null pointer exception on job 
submission. In the flink-conf.yaml, I have `fs.default-scheme: thisIsWrong`. 
Look at this:

```
org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Cannot initialize task 'DataSink (CsvOutputFormat (path: 
/user/robert/elasdoijwef, delimiter:  ))': null
at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
at 
org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:78)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at 
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:804)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:331)
at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1127)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1175)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot 
initialize task 'DataSink (CsvOutputFormat (path: /user/robert/elasdoijwef, 
delimiter:  ))': null
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$6.apply(JobManager.scala:981)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$6.apply(JobManager.scala:965)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:965)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:378)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at 
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:105)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
  

[GitHub] flink pull request: FLINK-2380: allow to specify the default files...

2016-02-09 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1524#issuecomment-182065768
  
I identified the following issues:

- Setting the configuration using the yarn session "dynamic properties": 
`./bin/yarn-session.sh -n 2 -Dfs.default-scheme=hdfs:///` does not really work 
(the configuration parameter shows up in the web interface, but the job fails)
- Setting a false schema leads to a null pointer exception on job 
submission. In the flink-conf.yaml, I have `fs.default-scheme: thisIsWrong`. 
Look at this:

```
org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Cannot initialize task 'DataSink (CsvOutputFormat (path: 
/user/robert/elasdoijwef, delimiter:  ))': null
at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
at 
org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:78)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at 
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:804)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:331)
at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1127)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1175)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot 
initialize task 'DataSink (CsvOutputFormat (path: /user/robert/elasdoijwef, 
delimiter:  ))': null
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$6.apply(JobManager.scala:981)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$6.apply(JobManager.scala:965)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:965)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:378)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at 
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:105)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

[jira] [Commented] (FLINK-2380) Allow to configure default FS for file inputs

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139717#comment-15139717
 ] 

ASF GitHub Bot commented on FLINK-2380:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1524#issuecomment-182054657
  
I'm testing the change on a cluster (with YARN) to see if everything is 
working as expected.


> Allow to configure default FS for file inputs
> -
>
> Key: FLINK-2380
> URL: https://issues.apache.org/jira/browse/FLINK-2380
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Affects Versions: 0.9, 0.10.0
>Reporter: Ufuk Celebi
>Assignee: Klou
>Priority: Minor
>  Labels: starter
> Fix For: 1.0.0
>
>
> File inputs use "file://" as default prefix. A user asked to make this 
> configurable, e.g. "hdfs://" as default.
> (I'm not sure whether this is already possible or not. I will check and 
> either close or implement this for the user.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3260) ExecutionGraph gets stuck in state FAILING

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139701#comment-15139701
 ] 

ASF GitHub Bot commented on FLINK-3260:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1613#issuecomment-182046481
  
Looks good, with one inline comment.

Otherwise, +1 to merge


> ExecutionGraph gets stuck in state FAILING
> --
>
> Key: FLINK-3260
> URL: https://issues.apache.org/jira/browse/FLINK-3260
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 0.10.1
>Reporter: Stephan Ewen
>Assignee: Till Rohrmann
>Priority: Blocker
> Fix For: 1.0.0
>
>
> It is a bit of a rare case, but the following can currently happen:
>   1. Jobs runs for a while, some tasks are already finished.
>   2. Job fails, goes to state failing and restarting. Non-finished tasks fail 
> or are canceled.
>   3. For the finished tasks, ask-futures from certain messages (for example 
> for releasing intermediate result partitions) can fail (timeout) and cause 
> the execution to go from FINISHED to FAILED
>   4. This triggers the execution graph to go to FAILING without ever going 
> further into RESTARTING again
>   5. The job is stuck
> It initially looks like this is mainly an issue for batch jobs (jobs where 
> tasks do finish, rather than run infinitely).
> The log that shows how this manifests:
> {code}
> 
> 17:19:19,782 INFO  akka.event.slf4j.Slf4jLogger   
>- Slf4jLogger started
> 17:19:19,844 INFO  Remoting   
>- Starting remoting
> 17:19:20,065 INFO  Remoting   
>- Remoting started; listening on addresses 
> :[akka.tcp://flink@127.0.0.1:56722]
> 17:19:20,090 INFO  org.apache.flink.runtime.blob.BlobServer   
>- Created BLOB server storage directory 
> /tmp/blobStore-6766f51a-1c51-4a03-acfb-08c2c29c11f0
> 17:19:20,096 INFO  org.apache.flink.runtime.blob.BlobServer   
>- Started BLOB server at 0.0.0.0:43327 - max concurrent requests: 50 - max 
> backlog: 1000
> 17:19:20,113 INFO  org.apache.flink.runtime.jobmanager.MemoryArchivist
>- Started memory archivist akka://flink/user/archive
> 17:19:20,115 INFO  org.apache.flink.runtime.checkpoint.SavepointStoreFactory  
>- No savepoint state backend configured. Using job manager savepoint state 
> backend.
> 17:19:20,118 INFO  org.apache.flink.runtime.jobmanager.JobManager 
>- Starting JobManager at akka.tcp://flink@127.0.0.1:56722/user/jobmanager.
> 17:19:20,123 INFO  org.apache.flink.runtime.jobmanager.JobManager 
>- JobManager akka.tcp://flink@127.0.0.1:56722/user/jobmanager was granted 
> leadership with leader session ID None.
> 17:19:25,605 INFO  org.apache.flink.runtime.instance.InstanceManager  
>- Registered TaskManager at 
> testing-worker-linux-docker-e6d6931f-3200-linux-4 
> (akka.tcp://flink@172.17.0.253:43702/user/taskmanager) as 
> f213232054587f296a12140d56f63ed1. Current number of registered hosts is 1. 
> Current number of alive task slots is 2.
> 17:19:26,758 INFO  org.apache.flink.runtime.instance.InstanceManager  
>- Registered TaskManager at 
> testing-worker-linux-docker-e6d6931f-3200-linux-4 
> (akka.tcp://flink@172.17.0.253:43956/user/taskmanager) as 
> f9e78baa14fb38c69517fb1bcf4f419c. Current number of registered hosts is 2. 
> Current number of alive task slots is 4.
> 17:19:27,064 INFO  org.apache.flink.api.java.ExecutionEnvironment 
>- The job has 0 registered types and 0 default Kryo serializers
> 17:19:27,071 INFO  org.apache.flink.client.program.Client 
>- Starting client actor system
> 17:19:27,072 INFO  org.apache.flink.runtime.client.JobClient  
>- Starting JobClient actor system
> 17:19:27,110 INFO  akka.event.slf4j.Slf4jLogger   
>- Slf4jLogger started
> 17:19:27,121 INFO  Remoting   
>- Starting remoting
> 17:19:27,143 INFO  org.apache.flink.runtime.client.JobClient  
>- Started JobClient actor system at 127.0.0.1:51198
> 17:19:27,145 INFO  Remoting   
>- Remoting started; listening on addresses 
> :[akka.tcp://flink@127.0.0.1:51198]
> 17:19:27,325 INFO  org.apache.flink.runtime.client.JobClientActor 
>- Disconnect from JobManager null.
> 17:19:27,362 INFO  org.apache.flink.runtime.client.JobClientActor 
>- Received job Flink Java

[jira] [Commented] (FLINK-3260) ExecutionGraph gets stuck in state FAILING

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139699#comment-15139699
 ] 

ASF GitHub Bot commented on FLINK-3260:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1613#discussion_r52368138
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ---
@@ -107,7 +108,7 @@
private static final AtomicReferenceFieldUpdater STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(Execution.class, 
ExecutionState.class, "state");

-   private static final Logger LOG = ExecutionGraph.LOG;
+   private static final Logger LOG = 
LoggerFactory.getLogger(Execution.class);
--- End diff --

Did this cause issues in this case? I originally set the logger to the 
ExecutionGraph logger to get all messages related to the execution and it 
changes in one log namespace. I always thought that makes searching the log 
easier.


> ExecutionGraph gets stuck in state FAILING
> --
>
> Key: FLINK-3260
> URL: https://issues.apache.org/jira/browse/FLINK-3260
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 0.10.1
>Reporter: Stephan Ewen
>Assignee: Till Rohrmann
>Priority: Blocker
> Fix For: 1.0.0
>
>
> It is a bit of a rare case, but the following can currently happen:
>   1. Jobs runs for a while, some tasks are already finished.
>   2. Job fails, goes to state failing and restarting. Non-finished tasks fail 
> or are canceled.
>   3. For the finished tasks, ask-futures from certain messages (for example 
> for releasing intermediate result partitions) can fail (timeout) and cause 
> the execution to go from FINISHED to FAILED
>   4. This triggers the execution graph to go to FAILING without ever going 
> further into RESTARTING again
>   5. The job is stuck
> It initially looks like this is mainly an issue for batch jobs (jobs where 
> tasks do finish, rather than run infinitely).
> The log that shows how this manifests:
> {code}
> 
> 17:19:19,782 INFO  akka.event.slf4j.Slf4jLogger   
>- Slf4jLogger started
> 17:19:19,844 INFO  Remoting   
>- Starting remoting
> 17:19:20,065 INFO  Remoting   
>- Remoting started; listening on addresses 
> :[akka.tcp://flink@127.0.0.1:56722]
> 17:19:20,090 INFO  org.apache.flink.runtime.blob.BlobServer   
>- Created BLOB server storage directory 
> /tmp/blobStore-6766f51a-1c51-4a03-acfb-08c2c29c11f0
> 17:19:20,096 INFO  org.apache.flink.runtime.blob.BlobServer   
>- Started BLOB server at 0.0.0.0:43327 - max concurrent requests: 50 - max 
> backlog: 1000
> 17:19:20,113 INFO  org.apache.flink.runtime.jobmanager.MemoryArchivist
>- Started memory archivist akka://flink/user/archive
> 17:19:20,115 INFO  org.apache.flink.runtime.checkpoint.SavepointStoreFactory  
>- No savepoint state backend configured. Using job manager savepoint state 
> backend.
> 17:19:20,118 INFO  org.apache.flink.runtime.jobmanager.JobManager 
>- Starting JobManager at akka.tcp://flink@127.0.0.1:56722/user/jobmanager.
> 17:19:20,123 INFO  org.apache.flink.runtime.jobmanager.JobManager 
>- JobManager akka.tcp://flink@127.0.0.1:56722/user/jobmanager was granted 
> leadership with leader session ID None.
> 17:19:25,605 INFO  org.apache.flink.runtime.instance.InstanceManager  
>- Registered TaskManager at 
> testing-worker-linux-docker-e6d6931f-3200-linux-4 
> (akka.tcp://flink@172.17.0.253:43702/user/taskmanager) as 
> f213232054587f296a12140d56f63ed1. Current number of registered hosts is 1. 
> Current number of alive task slots is 2.
> 17:19:26,758 INFO  org.apache.flink.runtime.instance.InstanceManager  
>- Registered TaskManager at 
> testing-worker-linux-docker-e6d6931f-3200-linux-4 
> (akka.tcp://flink@172.17.0.253:43956/user/taskmanager) as 
> f9e78baa14fb38c69517fb1bcf4f419c. Current number of registered hosts is 2. 
> Current number of alive task slots is 4.
> 17:19:27,064 INFO  org.apache.flink.api.java.ExecutionEnvironment 
>- The job has 0 registered types and 0 default Kryo serializers
> 17:19:27,071 INFO  org.apache.flink.client.program.Client 
>- Starting client actor system
> 17:19:27,072 INFO  org.apache.flink.runtime.client.JobClient  
>- Starting JobClient actor system
> 17:19:27,110 INFO  akka.event.slf4j.Slf4jLogger   
>-

[GitHub] flink pull request: [FLINK-3260] [runtime] Enforce terminal state ...

2016-02-09 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1613#issuecomment-182046481
  
Looks good, with one inline comment.

Otherwise, +1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3260] [runtime] Enforce terminal state ...

2016-02-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1613#discussion_r52368138
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ---
@@ -107,7 +108,7 @@
private static final AtomicReferenceFieldUpdater STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(Execution.class, 
ExecutionState.class, "state");

-   private static final Logger LOG = ExecutionGraph.LOG;
+   private static final Logger LOG = 
LoggerFactory.getLogger(Execution.class);
--- End diff --

Did this cause issues in this case? I originally set the logger to the 
ExecutionGraph logger to get all messages related to the execution and it 
changes in one log namespace. I always thought that makes searching the log 
easier.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3373) Using a newer library of Apache HttpClient than 4.2.6 will get class loading problems

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139694#comment-15139694
 ] 

ASF GitHub Bot commented on FLINK-3373:
---

GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/1615

[FLINK-3373] [build] Shade away Hadoop's HTTP Components dependency

This makes the HTTP Components dependency disappear from the core 
classpath, allowing users to use their own version of the dependency.

We need shading because we cannot simply bump the HTTP Components version 
to the newest version. The YARN test for Hadoop version >= 2.6.0 fail in that 
case.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink http_shade

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1615.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1615


commit 1be39d12071c7251cd566e692c3a9c7b5440e46d
Author: Stephan Ewen 
Date:   2016-02-09T20:18:43Z

[FLINK-3373] [build] Shade away Hadoop's HTTP Components dependency




> Using a newer library of Apache HttpClient than 4.2.6 will get class loading 
> problems
> -
>
> Key: FLINK-3373
> URL: https://issues.apache.org/jira/browse/FLINK-3373
> Project: Flink
>  Issue Type: Bug
> Environment: Latest Flink snapshot 1.0
>Reporter: Jakob Sultan Ericsson
>
> When I trying to use Apache HTTP client 4.5.1 in my flink job it will crash 
> with NoClassDefFound.
> This has to do that it load some classes from provided httpclient 4.2.5/6 in 
> core flink.
> {noformat}
> 17:05:56,193 INFO  org.apache.flink.runtime.taskmanager.Task  
>- DuplicateFilter -> InstallKeyLookup (11/16) switched to FAILED with 
> exception.
> java.lang.NoSuchFieldError: INSTANCE
> at 
> org.apache.http.conn.ssl.SSLConnectionSocketFactory.(SSLConnectionSocketFactory.java:144)
> at 
> org.apache.http.impl.conn.PoolingHttpClientConnectionManager.getDefaultRegistry(PoolingHttpClientConnectionManager.java:109)
> at 
> org.apache.http.impl.conn.PoolingHttpClientConnectionManager.(PoolingHttpClientConnectionManager.java:116)
> ...
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:89)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:305)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:227)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> SSLConnectionSocketFactory and finds an earlier version of the 
> AllowAllHostnameVerifier that does have the INSTANCE variable (instance 
> variable was probably added in 4.3).
> {noformat}
> jar tvf lib/flink-dist-1.0-SNAPSHOT.jar |grep AllowAllHostnameVerifier  
>791 Thu Dec 17 09:55:46 CET 2015 
> org/apache/http/conn/ssl/AllowAllHostnameVerifier.class
> {noformat}
> Solutions would be:
> - Fix the classloader so that my custom job does not conflict with internal 
> flink-core classes... pretty hard
> - Remove the dependency somehow.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3373] [build] Shade away Hadoop's HTTP ...

2016-02-09 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/1615

[FLINK-3373] [build] Shade away Hadoop's HTTP Components dependency

This makes the HTTP Components dependency disappear from the core 
classpath, allowing users to use their own version of the dependency.

We need shading because we cannot simply bump the HTTP Components version 
to the newest version. The YARN test for Hadoop version >= 2.6.0 fail in that 
case.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink http_shade

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1615.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1615


commit 1be39d12071c7251cd566e692c3a9c7b5440e46d
Author: Stephan Ewen 
Date:   2016-02-09T20:18:43Z

[FLINK-3373] [build] Shade away Hadoop's HTTP Components dependency




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3333) Documentation about object reuse should be improved

2016-02-09 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139668#comment-15139668
 ] 

Greg Hogan commented on FLINK-:
---

I can think of four return values from a user defined {{reduce()}}: left, 
right, new object, or a long-lived local object. The long-lived user object can 
be later modified by the user, but storing and later modifying the left or 
right input objects is unsafe. That is what I consider an edge case, an action 
that no user is expected to perform, and that cannot be prevented without 
impacting performance (a.k.a. disabling object reuse).

> Documentation about object reuse should be improved
> ---
>
> Key: FLINK-
> URL: https://issues.apache.org/jira/browse/FLINK-
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Blocker
> Fix For: 1.0.0
>
>
> The documentation about object reuse \[1\] has several problems, see \[2\].
> \[1\] 
> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior
> \[2\] 
> https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-3333) Documentation about object reuse should be improved

2016-02-09 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139151#comment-15139151
 ] 

Greg Hogan edited comment on FLINK- at 2/9/16 8:07 PM:
---

Apache Flink programs can be written and configured to reduce the number of 
object allocations for better performance. User defined functions (like map() 
or groupReduce()) process many millions or billions of input and output values. 
Enabling object reuse and processing mutable objects improves performance by 
lowering demand on the CPU cache and Java garbage collector.

Object reuse is disabled by default, with user defined functions generally 
getting new objects on each call (or through an iterator). In this case it is 
safe to store references to the objects inside the function (for example, in a 
List).

<'storing values in a list' example>

Apache Flink will chain functions to improve performance when sorting is 
preserved and the parallelism unchanged. The chainable operators are Map, 
FlatMap, Reduce on full DataSet, GroupCombine on a Grouped DataSet, or a 
GroupReduce where the user supplied a RichGroupReduceFunction with a combine 
method. Objects are passed without copying _even when object reuse is disabled_.

In the chaining case, the functions in the chain are receiving the same object 
instances. So the the second map() function is receiving the objects the first 
map() is returning. This behavior can lead to errors when the first map() 
function keeps a list of all objects and the second mapper is modifying 
objects. In that case, the user has to manually create copies of the objects 
before putting them into the list.







There is a switch at the ExecutionConfig which allows users to enable the 
object reuse mode (enableObjectReuse()). For mutable types, Flink will reuse 
object instances. In practice that means that a user function will always 
receive the same object instance (with its fields set to new values). The 
object reuse mode will lead to better performance because fewer objects are 
created, but the user has to manually take care of what they are doing with the 
object references.




was (Author: greghogan):
Apache Flink programs can be written and configured to reduce the number of 
object allocations for better performance. User defined functions (like map() 
or groupReduce()) process many millions or billions of input and output values. 
Enabling object reuse and processing mutable objects improves performance by 
lowering demand on the CPU cache and Java garbage collector.

Object reuse is disabled by default, with user defined functions generally 
getting new objects on each call (or through an iterator). In this case it is 
safe to store references to the objects inside the function (for example, in a 
List).

<'storing values in a list' example>

Apache Flink will chain functions to improve performance when sorting is 
preserved and the parallelism unchanged. The chainable operators are Map, 
FlatMap, Reduce on full DataSet, GroupCombine on a Grouped DataSet, or a 
GroupReduce where the user supplied a RichGroupReduceFunction with a combine 
method). Objects are passed without copying _even when object reuse is 
disabled_.

In the chaining case, the functions in the chain are receiving the same object 
instances. So the the second map() function is receiving the objects the first 
map() is returning. This behavior can lead to errors when the first map() 
function keeps a list of all objects and the second mapper is modifying 
objects. In that case, the user has to manually create copies of the objects 
before putting them into the list.







There is a switch at the ExecutionConfig which allows users to enable the 
object reuse mode (enableObjectReuse()). For mutable types, Flink will reuse 
object instances. In practice that means that a user function will always 
receive the same object instance (with its fields set to new values). The 
object reuse mode will lead to better performance because fewer objects are 
created, but the user has to manually take care of what they are doing with the 
object references.



> Documentation about object reuse should be improved
> ---
>
> Key: FLINK-
> URL: https://issues.apache.org/jira/browse/FLINK-
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Blocker
> Fix For: 1.0.0
>
>
> The documentation about object reuse \[1\] has several problems, see \[2\].
> \[1\] 
> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior
> \[2\] 
> https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit



--
This messa

[jira] [Closed] (FLINK-3382) Improve clarity of object reuse in ReusingMutableToRegularIteratorWrapper

2016-02-09 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan closed FLINK-3382.
-
Resolution: Not A Problem

Per the comment on the pull request, this change would interfere with proper 
use of the iterator.

> Improve clarity of object reuse in ReusingMutableToRegularIteratorWrapper
> -
>
> Key: FLINK-3382
> URL: https://issues.apache.org/jira/browse/FLINK-3382
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime
>Affects Versions: 1.0.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> Object reuse in {{ReusingMutableToRegularIteratorWrapper.hasNext()}} can be 
> clarified by creating a single object and storing the iterator's next value 
> into the second reference.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3382) Improve clarity of object reuse in ReusingMutableToRegularIteratorWrapper

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139653#comment-15139653
 ] 

ASF GitHub Bot commented on FLINK-3382:
---

Github user greghogan closed the pull request at:

https://github.com/apache/flink/pull/1614


> Improve clarity of object reuse in ReusingMutableToRegularIteratorWrapper
> -
>
> Key: FLINK-3382
> URL: https://issues.apache.org/jira/browse/FLINK-3382
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime
>Affects Versions: 1.0.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> Object reuse in {{ReusingMutableToRegularIteratorWrapper.hasNext()}} can be 
> clarified by creating a single object and storing the iterator's next value 
> into the second reference.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3382] Improve clarity of object reuse i...

2016-02-09 Thread greghogan
Github user greghogan commented on the pull request:

https://github.com/apache/flink/pull/1614#issuecomment-182035391
  
Ah, yes, now I see. I'll just burn this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3382) Improve clarity of object reuse in ReusingMutableToRegularIteratorWrapper

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139652#comment-15139652
 ] 

ASF GitHub Bot commented on FLINK-3382:
---

Github user greghogan commented on the pull request:

https://github.com/apache/flink/pull/1614#issuecomment-182035391
  
Ah, yes, now I see. I'll just burn this.


> Improve clarity of object reuse in ReusingMutableToRegularIteratorWrapper
> -
>
> Key: FLINK-3382
> URL: https://issues.apache.org/jira/browse/FLINK-3382
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime
>Affects Versions: 1.0.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> Object reuse in {{ReusingMutableToRegularIteratorWrapper.hasNext()}} can be 
> clarified by creating a single object and storing the iterator's next value 
> into the second reference.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3382] Improve clarity of object reuse i...

2016-02-09 Thread greghogan
Github user greghogan closed the pull request at:

https://github.com/apache/flink/pull/1614


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3382] Improve clarity of object reuse i...

2016-02-09 Thread ggevay
Github user ggevay commented on the pull request:

https://github.com/apache/flink/pull/1614#issuecomment-182025617
  
Shouldn't the current record remain valid if `hasNext()` returned true? I 
mean the user might be holding on to the object returned in `next`, and expect 
it to not be changed by a `hasNext` call:
```
T cur = it.next();
if(it.hasNext()) {
  // here, I would expect cur to not have changed since the next() call
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3382) Improve clarity of object reuse in ReusingMutableToRegularIteratorWrapper

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139530#comment-15139530
 ] 

ASF GitHub Bot commented on FLINK-3382:
---

Github user ggevay commented on the pull request:

https://github.com/apache/flink/pull/1614#issuecomment-182025617
  
Shouldn't the current record remain valid if `hasNext()` returned true? I 
mean the user might be holding on to the object returned in `next`, and expect 
it to not be changed by a `hasNext` call:
```
T cur = it.next();
if(it.hasNext()) {
  // here, I would expect cur to not have changed since the next() call
}
```


> Improve clarity of object reuse in ReusingMutableToRegularIteratorWrapper
> -
>
> Key: FLINK-3382
> URL: https://issues.apache.org/jira/browse/FLINK-3382
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime
>Affects Versions: 1.0.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> Object reuse in {{ReusingMutableToRegularIteratorWrapper.hasNext()}} can be 
> clarified by creating a single object and storing the iterator's next value 
> into the second reference.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3382) Improve clarity of object reuse in ReusingMutableToRegularIteratorWrapper

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139441#comment-15139441
 ] 

ASF GitHub Bot commented on FLINK-3382:
---

GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/1614

[FLINK-3382] Improve clarity of object reuse in 
ReusingMutableToRegularIteratorWrapper



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
3382_improve_clarity_of_object_reuse_in_ReusingMutableToRegularIteratorWrapper

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1614.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1614






> Improve clarity of object reuse in ReusingMutableToRegularIteratorWrapper
> -
>
> Key: FLINK-3382
> URL: https://issues.apache.org/jira/browse/FLINK-3382
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime
>Affects Versions: 1.0.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> Object reuse in {{ReusingMutableToRegularIteratorWrapper.hasNext()}} can be 
> clarified by creating a single object and storing the iterator's next value 
> into the second reference.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3382] Improve clarity of object reuse i...

2016-02-09 Thread greghogan
GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/1614

[FLINK-3382] Improve clarity of object reuse in 
ReusingMutableToRegularIteratorWrapper



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
3382_improve_clarity_of_object_reuse_in_ReusingMutableToRegularIteratorWrapper

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1614.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1614






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3333) Documentation about object reuse should be improved

2016-02-09 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139435#comment-15139435
 ] 

Gabor Gevay commented on FLINK-:


One more thing: if we decide to go with including chaining, then please also 
explain that a "chainable operator" means that it can be chained with the 
_previous_ operator.

> Documentation about object reuse should be improved
> ---
>
> Key: FLINK-
> URL: https://issues.apache.org/jira/browse/FLINK-
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Blocker
> Fix For: 1.0.0
>
>
> The documentation about object reuse \[1\] has several problems, see \[2\].
> \[1\] 
> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior
> \[2\] 
> https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2523) Make task canceling interrupt interval configurable

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139415#comment-15139415
 ] 

ASF GitHub Bot commented on FLINK-2523:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1612#discussion_r52356401
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -102,60 +101,150 @@
// 

 
/**
-* Constructs a new job graph with no name and a random job ID.
+* Constructs a new job graph with no name, a random job ID, and the 
default
+* {@link ExecutionConfig} parameters.
 */
public JobGraph() {
this((String) null);
}
 
/**
-* Constructs a new job graph with the given name, a random job ID.
+* Constructs a new job graph with the given name, a random job ID and 
the default
+* {@link ExecutionConfig} parameters.
 *
 * @param jobName The name of the job
 */
public JobGraph(String jobName) {
-   this(null, jobName);
+   this(null, jobName, new ExecutionConfig());
+   }
+
+   /**
+* Constructs a new job graph with the given job ID, the given name, 
and the default
+* {@link ExecutionConfig} parameters.
+*
+* @param jobID The id of the job. A random ID is generated, if {@code 
null} is passed.
+* @param jobName The name of the job.
+*/
+   public JobGraph(JobID jobID, String jobName) {
+   this(jobID, jobName, new ExecutionConfig());
}
 
/**
-* Constructs a new job graph with the given name and a random job ID 
if null supplied as an id.
+* Constructs a new job graph with the given name, the given {@link 
ExecutionConfig},
+* and a random job ID.
+*
+* @param jobName The name of the job.
+* @param config The execution configuration of the job.
+*/
+   public JobGraph(String jobName, ExecutionConfig config) {
+   this(null, jobName, config);
+   }
+
+   /**
+* Constructs a new job graph with the given job ID (or a random ID, if 
{@code null} is passed),
+* the given name and the given execution configuration (see {@link 
ExecutionConfig}).
 *
 * @param jobId The id of the job. A random ID is generated, if {@code 
null} is passed.
 * @param jobName The name of the job.
+* @param config The execution configuration of the job.
 */
-   public JobGraph(JobID jobId, String jobName) {
+   public JobGraph(JobID jobId, String jobName, ExecutionConfig config) {
this.jobID = jobId == null ? new JobID() : jobId;
this.jobName = jobName == null ? "(unnamed job)" : jobName;
+   this.executionConfig = config;
--- End diff --

Can `executionConfig` be `null`? If not, then we should insert a check here.


> Make task canceling interrupt interval configurable
> ---
>
> Key: FLINK-2523
> URL: https://issues.apache.org/jira/browse/FLINK-2523
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Klou
> Fix For: 1.0.0
>
>
> When a task is canceled, the cancellation calls periodically "interrupt()" on 
> the task thread, if the task thread does not cancel with a certain time.
> Currently, this value is hard coded to 10 seconds. We should make that time 
> configurable.
> Until then, I would like to increase the value to 30 seconds, as many tasks 
> (here I am observing it for Kafka consumers) can take longer then 10 seconds 
> for proper cleanup.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: FLINK-2523: Makes the task cancellation interv...

2016-02-09 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1612#discussion_r52356401
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -102,60 +101,150 @@
// 

 
/**
-* Constructs a new job graph with no name and a random job ID.
+* Constructs a new job graph with no name, a random job ID, and the 
default
+* {@link ExecutionConfig} parameters.
 */
public JobGraph() {
this((String) null);
}
 
/**
-* Constructs a new job graph with the given name, a random job ID.
+* Constructs a new job graph with the given name, a random job ID and 
the default
+* {@link ExecutionConfig} parameters.
 *
 * @param jobName The name of the job
 */
public JobGraph(String jobName) {
-   this(null, jobName);
+   this(null, jobName, new ExecutionConfig());
+   }
+
+   /**
+* Constructs a new job graph with the given job ID, the given name, 
and the default
+* {@link ExecutionConfig} parameters.
+*
+* @param jobID The id of the job. A random ID is generated, if {@code 
null} is passed.
+* @param jobName The name of the job.
+*/
+   public JobGraph(JobID jobID, String jobName) {
+   this(jobID, jobName, new ExecutionConfig());
}
 
/**
-* Constructs a new job graph with the given name and a random job ID 
if null supplied as an id.
+* Constructs a new job graph with the given name, the given {@link 
ExecutionConfig},
+* and a random job ID.
+*
+* @param jobName The name of the job.
+* @param config The execution configuration of the job.
+*/
+   public JobGraph(String jobName, ExecutionConfig config) {
+   this(null, jobName, config);
+   }
+
+   /**
+* Constructs a new job graph with the given job ID (or a random ID, if 
{@code null} is passed),
+* the given name and the given execution configuration (see {@link 
ExecutionConfig}).
 *
 * @param jobId The id of the job. A random ID is generated, if {@code 
null} is passed.
 * @param jobName The name of the job.
+* @param config The execution configuration of the job.
 */
-   public JobGraph(JobID jobId, String jobName) {
+   public JobGraph(JobID jobId, String jobName, ExecutionConfig config) {
this.jobID = jobId == null ? new JobID() : jobId;
this.jobName = jobName == null ? "(unnamed job)" : jobName;
+   this.executionConfig = config;
--- End diff --

Can `executionConfig` be `null`? If not, then we should insert a check here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-3382) Improve clarity of object reuse in ReusingMutableToRegularIteratorWrapper

2016-02-09 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-3382:
--
Description: Object reuse in 
{{ReusingMutableToRegularIteratorWrapper.hasNext()}} can be clarified by 
creating a single object and storing the iterator's next value into the second 
reference.  (was: Object reuse in 
{{ReusingMutableToRegularIteratorWrapper.hasNext()} can be clarified by 
creating a single object and storing the iterator's next value into the second 
reference.)

> Improve clarity of object reuse in ReusingMutableToRegularIteratorWrapper
> -
>
> Key: FLINK-3382
> URL: https://issues.apache.org/jira/browse/FLINK-3382
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime
>Affects Versions: 1.0.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> Object reuse in {{ReusingMutableToRegularIteratorWrapper.hasNext()}} can be 
> clarified by creating a single object and storing the iterator's next value 
> into the second reference.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3335) Fix DataSourceTask object reuse when disabled

2016-02-09 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-3335:
--
Summary: Fix DataSourceTask object reuse when disabled  (was: 
DataSourceTask object reuse when disabled)

> Fix DataSourceTask object reuse when disabled
> -
>
> Key: FLINK-3335
> URL: https://issues.apache.org/jira/browse/FLINK-3335
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Affects Versions: 1.0.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>
> From {{DataSourceTask.invoke()}}:
> {code}
> if ((returned = format.nextRecord(serializer.createInstance())) != null) {
> output.collect(returned);
> }
> {code}
> The returned value ({{returned}}) must be copied rather than creating and 
> passing in a new instance. The {{InputFormat}} interface only permits the 
> given object to be used and does not require a new object to be returned 
> otherwise.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139332#comment-15139332
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1600#discussion_r52349128
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
 ---
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+abstract class AvgAggregate[T] extends Aggregate[T] {
+
+}
+
+// TinyInt average aggregate return Int as aggregated value.
+class TinyIntAvgAggregate extends AvgAggregate[Byte] {
+  private var sum: Long = 0
+  private var count: Int = 0
+
+  override def initiateAggregate: Unit = {
+sum = 0
+count = 0
+  }
+
+  override def aggregate(value: Any): Unit = {
+count += 1
+sum += value.asInstanceOf[Byte]
--- End diff --

What about simply adding `0.5`?


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3226] Translate logical aggregations to...

2016-02-09 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1600#discussion_r52349128
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
 ---
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+abstract class AvgAggregate[T] extends Aggregate[T] {
+
+}
+
+// TinyInt average aggregate return Int as aggregated value.
+class TinyIntAvgAggregate extends AvgAggregate[Byte] {
+  private var sum: Long = 0
+  private var count: Int = 0
+
+  override def initiateAggregate: Unit = {
+sum = 0
+count = 0
+  }
+
+  override def aggregate(value: Any): Unit = {
+count += 1
+sum += value.asInstanceOf[Byte]
--- End diff --

What about simply adding `0.5`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3187) Decouple restart strategy from ExecutionGraph

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139297#comment-15139297
 ] 

ASF GitHub Bot commented on FLINK-3187:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1470#issuecomment-181981959
  
If nobody objects, then I would like to merge this PR, since it will give 
us more flexibility in the future with respect to restarting strategies.


> Decouple restart strategy from ExecutionGraph
> -
>
> Key: FLINK-3187
> URL: https://issues.apache.org/jira/browse/FLINK-3187
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> Currently, the {{ExecutionGraph}} supports the following restart logic: 
> Whenever a failure occurs and the number of restart attempts aren't depleted, 
> wait for a fixed amount of time and then try to restart. This behaviour can 
> be controlled by the configuration parameters {{execution-retries.default}} 
> and {{execution-retries.delay}}.
> I propose to decouple the restart logic from the {{ExecutionGraph}} a bit by 
> introducing a strategy pattern. That way it would not only allow us to define 
> a job specific restart behaviour but also to implement different restart 
> strategies. Conceivable strategies could be: Fixed timeout restart, 
> exponential backoff restart, partial topology restarts, etc.
> This change is a preliminary step towards having a restart strategy which 
> will scale the parallelism of a job down in case that not enough slots are 
> available.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3187] Introduce RestartStrategy to deco...

2016-02-09 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1470#issuecomment-181981959
  
If nobody objects, then I would like to merge this PR, since it will give 
us more flexibility in the future with respect to restarting strategies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3260] [runtime] Enforce terminal state ...

2016-02-09 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/1613

[FLINK-3260] [runtime] Enforce terminal state of Executions

This commit fixes the problem that Executions could leave their terminal 
state
FINISHED to transition to FAILED. Such a transition will be propagated to 
the
ExecutionGraph where it entails JobStatus changes. Since the Execution 
already
reached a terminal state, it should not again affect the ExecutionGraph. 
This
can lead to an inconsistent state in case of a restart where the old 
Executions
get disassociated from the ExecutionGraph.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixCallbacks

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1613.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1613


commit cb2a2fb6d3dde5e248e6153e849c8f07d241a10d
Author: Till Rohrmann 
Date:   2016-02-09T09:30:12Z

[FLINK-3260] [runtime] Enforce terminal state of Executions

This commit fixes the problem that Executions could leave their terminal 
state
FINISHED to transition to FAILED. Such a transition will be propagated to 
the
ExecutionGraph where it entails JobStatus changes. Since the Execution 
already
reached a terminal state, it should not again affect the ExecutionGraph. 
This
can lead to an inconsistent state in case of a restart where the old 
Executions
get disassociated from the ExecutionGraph.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3260) ExecutionGraph gets stuck in state FAILING

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139293#comment-15139293
 ] 

ASF GitHub Bot commented on FLINK-3260:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/1613

[FLINK-3260] [runtime] Enforce terminal state of Executions

This commit fixes the problem that Executions could leave their terminal 
state
FINISHED to transition to FAILED. Such a transition will be propagated to 
the
ExecutionGraph where it entails JobStatus changes. Since the Execution 
already
reached a terminal state, it should not again affect the ExecutionGraph. 
This
can lead to an inconsistent state in case of a restart where the old 
Executions
get disassociated from the ExecutionGraph.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixCallbacks

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1613.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1613


commit cb2a2fb6d3dde5e248e6153e849c8f07d241a10d
Author: Till Rohrmann 
Date:   2016-02-09T09:30:12Z

[FLINK-3260] [runtime] Enforce terminal state of Executions

This commit fixes the problem that Executions could leave their terminal 
state
FINISHED to transition to FAILED. Such a transition will be propagated to 
the
ExecutionGraph where it entails JobStatus changes. Since the Execution 
already
reached a terminal state, it should not again affect the ExecutionGraph. 
This
can lead to an inconsistent state in case of a restart where the old 
Executions
get disassociated from the ExecutionGraph.




> ExecutionGraph gets stuck in state FAILING
> --
>
> Key: FLINK-3260
> URL: https://issues.apache.org/jira/browse/FLINK-3260
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 0.10.1
>Reporter: Stephan Ewen
>Assignee: Till Rohrmann
>Priority: Blocker
> Fix For: 1.0.0
>
>
> It is a bit of a rare case, but the following can currently happen:
>   1. Jobs runs for a while, some tasks are already finished.
>   2. Job fails, goes to state failing and restarting. Non-finished tasks fail 
> or are canceled.
>   3. For the finished tasks, ask-futures from certain messages (for example 
> for releasing intermediate result partitions) can fail (timeout) and cause 
> the execution to go from FINISHED to FAILED
>   4. This triggers the execution graph to go to FAILING without ever going 
> further into RESTARTING again
>   5. The job is stuck
> It initially looks like this is mainly an issue for batch jobs (jobs where 
> tasks do finish, rather than run infinitely).
> The log that shows how this manifests:
> {code}
> 
> 17:19:19,782 INFO  akka.event.slf4j.Slf4jLogger   
>- Slf4jLogger started
> 17:19:19,844 INFO  Remoting   
>- Starting remoting
> 17:19:20,065 INFO  Remoting   
>- Remoting started; listening on addresses 
> :[akka.tcp://flink@127.0.0.1:56722]
> 17:19:20,090 INFO  org.apache.flink.runtime.blob.BlobServer   
>- Created BLOB server storage directory 
> /tmp/blobStore-6766f51a-1c51-4a03-acfb-08c2c29c11f0
> 17:19:20,096 INFO  org.apache.flink.runtime.blob.BlobServer   
>- Started BLOB server at 0.0.0.0:43327 - max concurrent requests: 50 - max 
> backlog: 1000
> 17:19:20,113 INFO  org.apache.flink.runtime.jobmanager.MemoryArchivist
>- Started memory archivist akka://flink/user/archive
> 17:19:20,115 INFO  org.apache.flink.runtime.checkpoint.SavepointStoreFactory  
>- No savepoint state backend configured. Using job manager savepoint state 
> backend.
> 17:19:20,118 INFO  org.apache.flink.runtime.jobmanager.JobManager 
>- Starting JobManager at akka.tcp://flink@127.0.0.1:56722/user/jobmanager.
> 17:19:20,123 INFO  org.apache.flink.runtime.jobmanager.JobManager 
>- JobManager akka.tcp://flink@127.0.0.1:56722/user/jobmanager was granted 
> leadership with leader session ID None.
> 17:19:25,605 INFO  org.apache.flink.runtime.instance.InstanceManager  
>- Registered TaskManager at 
> testing-worker-linux-docker-e6d6931f-3200-linux-4 
> (akka.tcp://flink@172.17.0.253:43702/user/taskmanager) as 
> f213232054587f296a12140d56f63ed1. Current number of registered hosts is 1. 
> Current number of alive task slots is 2.
> 17:1

[jira] [Commented] (FLINK-2523) Make task canceling interrupt interval configurable

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139279#comment-15139279
 ] 

ASF GitHub Bot commented on FLINK-2523:
---

GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/1612

FLINK-2523: Makes the task cancellation interval configurable.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink task_cancellation_interval

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1612.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1612


commit cd36d7ec883e69828bcb476d69aba465dca79b8d
Author: Aljoscha Krettek 
Date:   2016-02-03T10:07:38Z

[hotfix] Fix typos in Trigger.java

commit fdf74f036a96708fae7b8c8a5a2ce041bb7ed20f
Author: Kostas Kloudas 
Date:   2016-02-03T12:58:12Z

FLINK-3327: Attaches the ExecutionConfig to the JobGraph and propagates it 
to the Task itself.

commit 5ead1c05215f4fd70f55370f7864674d670b1282
Author: Kostas Kloudas 
Date:   2016-02-09T14:48:00Z

FLINK-2523: Makes the task cancellation interval configurable through the 
ExecutionConfig.




> Make task canceling interrupt interval configurable
> ---
>
> Key: FLINK-2523
> URL: https://issues.apache.org/jira/browse/FLINK-2523
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Klou
> Fix For: 1.0.0
>
>
> When a task is canceled, the cancellation calls periodically "interrupt()" on 
> the task thread, if the task thread does not cancel with a certain time.
> Currently, this value is hard coded to 10 seconds. We should make that time 
> configurable.
> Until then, I would like to increase the value to 30 seconds, as many tasks 
> (here I am observing it for Kafka consumers) can take longer then 10 seconds 
> for proper cleanup.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: FLINK-2523: Makes the task cancellation interv...

2016-02-09 Thread kl0u
GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/1612

FLINK-2523: Makes the task cancellation interval configurable.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink task_cancellation_interval

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1612.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1612


commit cd36d7ec883e69828bcb476d69aba465dca79b8d
Author: Aljoscha Krettek 
Date:   2016-02-03T10:07:38Z

[hotfix] Fix typos in Trigger.java

commit fdf74f036a96708fae7b8c8a5a2ce041bb7ed20f
Author: Kostas Kloudas 
Date:   2016-02-03T12:58:12Z

FLINK-3327: Attaches the ExecutionConfig to the JobGraph and propagates it 
to the Task itself.

commit 5ead1c05215f4fd70f55370f7864674d670b1282
Author: Kostas Kloudas 
Date:   2016-02-09T14:48:00Z

FLINK-2523: Makes the task cancellation interval configurable through the 
ExecutionConfig.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-3383) Separate Maven deployment from CI testing

2016-02-09 Thread Maximilian Michels (JIRA)
Maximilian Michels created FLINK-3383:
-

 Summary: Separate Maven deployment from CI testing
 Key: FLINK-3383
 URL: https://issues.apache.org/jira/browse/FLINK-3383
 Project: Flink
  Issue Type: Improvement
  Components: Build System, Tests
Affects Versions: 1.0.0
Reporter: Maximilian Michels
Assignee: Maximilian Michels
Priority: Critical


We currently handle our tests and the deployment of the Maven artifacts via 
Travis CI. Travis has a maximum allowed build time of two hours which we reach 
nearly every time. By that time, the tests have already been run but the 
deployment is still undergoing.

I propose to remove the Maven deployment from Travis. Instead, we could use 
Apache's Jenkins service or Apache's Buildbot service to trigger a deployment 
once a day.  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3333) Documentation about object reuse should be improved

2016-02-09 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139219#comment-15139219
 ] 

Gabor Gevay commented on FLINK-:


OK, this would also be a clear improvement over the current documentation.
However, this doesn't discuss the rules about whether I can modify output 
objects after returning them. This question is far from being trivial (as 
evidenced by the comment thread in the Google Doc). Do you think that this 
"edge case" is too insignificant to mention?

> I think it is most important that the user documentation be clear and 
> concise, otherwise it won't be read or will discourage new users.
This is why I think that the separation between the non-chained/chained cases 
should be left out of this. If we are aiming for simplicity here, then I really 
can't imagine a user meticulously checking whether his operator will be 
chained, and then writing different code based on this.

Other minor issues:

> In practice that means that a user function will always receive the same 
> object instance
"it can happen that" should be inserted. See point C) in the "Concrete problems 
with the current documentation" section in the Google Doc.

> User defined functions (like map() or groupReduce()) 
Should be "like MapFunction or GroupReduceFunction", to avoid confusing new 
users about what is a user defined function.


> Documentation about object reuse should be improved
> ---
>
> Key: FLINK-
> URL: https://issues.apache.org/jira/browse/FLINK-
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Blocker
> Fix For: 1.0.0
>
>
> The documentation about object reuse \[1\] has several problems, see \[2\].
> \[1\] 
> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior
> \[2\] 
> https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3226] Implement a CodeGenerator for an ...

2016-02-09 Thread vasia
Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/1595#issuecomment-181955939
  
Thanks for the quick update @twalthr! Some tests are failing because the 
wrong type of exception is expected. I'll fix those and then merge this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139202#comment-15139202
 ] 

ASF GitHub Bot commented on FLINK-3226:
---

Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/1595#issuecomment-181955939
  
Thanks for the quick update @twalthr! Some tests are failing because the 
wrong type of exception is expected. I'll fix those and then merge this.


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> -
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3380) Unstable Test: JobSubmissionFailsITCase

2016-02-09 Thread Klou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Klou updated FLINK-3380:

Description: 
https://s3.amazonaws.com/archive.travis-ci.org/jobs/108034635/log.txt  (was: 
{quote}
Tests run: 7, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 16.881 sec <<< 
FAILURE! - in org.apache.flink.test.failingPrograms.JobSubmissionFailsITCase
org.apache.flink.test.failingPrograms.JobSubmissionFailsITCase  Time elapsed: 
13.04 sec  <<< FAILURE!
java.lang.AssertionError: Futures timed out after [1 milliseconds]
at org.junit.Assert.fail(Assert.java:88)
at 
org.apache.flink.test.failingPrograms.JobSubmissionFailsITCase.teardown(JobSubmissionFailsITCase.java:82)
{quote})

> Unstable Test: JobSubmissionFailsITCase
> ---
>
> Key: FLINK-3380
> URL: https://issues.apache.org/jira/browse/FLINK-3380
> Project: Flink
>  Issue Type: Bug
>Reporter: Klou
>Priority: Critical
>
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/108034635/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: FLINK-3271: Include jetty-util in the dist jar

2016-02-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1543#discussion_r52336491
  
--- Diff: flink-shaded-hadoop/pom.xml ---
@@ -111,6 +111,7 @@ under the License.

io.netty:netty:*

org.apache.curator:*

org.apache.hadoop:*
+   
org.mortbay.jetty:*
--- End diff --

The `jetty-util` will actually be included in the Flink fat jar without 
this line. This line simply adds the `jetty-util` jar in addition into the 
`flink-shaded-hadoop1.jar` which is an intermediate artifact, but also deployed 
to maven. I would like to keep the dependency out of the intermediate artifact.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3271) Using webhdfs in a flink topology throws classnotfound exception

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139192#comment-15139192
 ] 

ASF GitHub Bot commented on FLINK-3271:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1543#discussion_r52336491
  
--- Diff: flink-shaded-hadoop/pom.xml ---
@@ -111,6 +111,7 @@ under the License.

io.netty:netty:*

org.apache.curator:*

org.apache.hadoop:*
+   
org.mortbay.jetty:*
--- End diff --

The `jetty-util` will actually be included in the Flink fat jar without 
this line. This line simply adds the `jetty-util` jar in addition into the 
`flink-shaded-hadoop1.jar` which is an intermediate artifact, but also deployed 
to maven. I would like to keep the dependency out of the intermediate artifact.


> Using webhdfs in a flink topology throws classnotfound exception
> 
>
> Key: FLINK-3271
> URL: https://issues.apache.org/jira/browse/FLINK-3271
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 0.10.1
>Reporter: Abhishek Agarwal
>Assignee: Abhishek Agarwal
>
> I was just trying to run a storm topology on flink using flink-storm. I got 
> this exception - 
> {noformat}
> Caused by: java.lang.NoClassDefFoundError: org/mortbay/util/ajax/JSON
>   at 
> org.apache.hadoop.hdfs.web.WebHdfsFileSystem.jsonParse(WebHdfsFileSystem.java:325)
>   at 
> org.apache.hadoop.hdfs.web.WebHdfsFileSystem$FsPathResponseRunner.getResponse(WebHdfsFileSystem.java:727)
>   at 
> org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.runWithRetry(WebHdfsFileSystem.java:610)
>   at 
> org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.access$100(WebHdfsFileSystem.java:458)
>   at 
> org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner$1.run(WebHdfsFileSystem.java:487)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
>   at 
> org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.run(WebHdfsFileSystem.java:483)
>   at 
> org.apache.hadoop.hdfs.web.WebHdfsFileSystem.listStatus(WebHdfsFileSystem.java:1277)
> {noformat}
> My topology list some files on hdfs using webhdfs API. 
> org.mortbay.util.ajax.JSON was included in the application uber jar. I 
> noticed that flink loads the application jar in a child classloader. This is 
> what most likely happened - 
> 1. WebHdfsFileSystem class was loaded through parent class loader since it is 
> included in flink-dist.jar.
> 2. WebHdfsFileSystem has reference to the org.mortbay.util.ajax.JSON but 
> since it is loaded through parent class loader, WebHdfsFileSystem can't read 
> a class in child class loader. 
> Ideally all the referenced classes should be available in the distribution 
> jar so that these sort of issues may not occur. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3382) Improve clarity of object reuse in ReusingMutableToRegularIteratorWrapper

2016-02-09 Thread Greg Hogan (JIRA)
Greg Hogan created FLINK-3382:
-

 Summary: Improve clarity of object reuse in 
ReusingMutableToRegularIteratorWrapper
 Key: FLINK-3382
 URL: https://issues.apache.org/jira/browse/FLINK-3382
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Affects Versions: 1.0.0
Reporter: Greg Hogan
Assignee: Greg Hogan
Priority: Minor


Object reuse in {{ReusingMutableToRegularIteratorWrapper.hasNext()} can be 
clarified by creating a single object and storing the iterator's next value 
into the second reference.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3381) Unstable Test: JobManagerSubmittedJobGraphsRecoveryITCase

2016-02-09 Thread Klou (JIRA)
Klou created FLINK-3381:
---

 Summary: Unstable Test: JobManagerSubmittedJobGraphsRecoveryITCase
 Key: FLINK-3381
 URL: https://issues.apache.org/jira/browse/FLINK-3381
 Project: Flink
  Issue Type: Bug
Reporter: Klou
Priority: Critical


https://s3.amazonaws.com/archive.travis-ci.org/jobs/108034634/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3380) Unstable Test: JobSubmissionFailsITCase

2016-02-09 Thread Ufuk Celebi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139190#comment-15139190
 ] 

Ufuk Celebi commented on FLINK-3380:


These should be fixed with https://github.com/apache/flink/pull/1611.

> Unstable Test: JobSubmissionFailsITCase
> ---
>
> Key: FLINK-3380
> URL: https://issues.apache.org/jira/browse/FLINK-3380
> Project: Flink
>  Issue Type: Bug
>Reporter: Klou
>Priority: Critical
>
> {quote}
> Tests run: 7, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 16.881 sec 
> <<< FAILURE! - in 
> org.apache.flink.test.failingPrograms.JobSubmissionFailsITCase
> org.apache.flink.test.failingPrograms.JobSubmissionFailsITCase  Time elapsed: 
> 13.04 sec  <<< FAILURE!
> java.lang.AssertionError: Futures timed out after [1 milliseconds]
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.test.failingPrograms.JobSubmissionFailsITCase.teardown(JobSubmissionFailsITCase.java:82)
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3380) Unstable Test: JobSubmissionFailsITCase

2016-02-09 Thread Klou (JIRA)
Klou created FLINK-3380:
---

 Summary: Unstable Test: JobSubmissionFailsITCase
 Key: FLINK-3380
 URL: https://issues.apache.org/jira/browse/FLINK-3380
 Project: Flink
  Issue Type: Bug
Reporter: Klou
Priority: Critical


{quote}
Tests run: 7, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 16.881 sec <<< 
FAILURE! - in org.apache.flink.test.failingPrograms.JobSubmissionFailsITCase
org.apache.flink.test.failingPrograms.JobSubmissionFailsITCase  Time elapsed: 
13.04 sec  <<< FAILURE!
java.lang.AssertionError: Futures timed out after [1 milliseconds]
at org.junit.Assert.fail(Assert.java:88)
at 
org.apache.flink.test.failingPrograms.JobSubmissionFailsITCase.teardown(JobSubmissionFailsITCase.java:82)
{quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3375) Allow Watermark Generation in the Kafka Source

2016-02-09 Thread Zach Cox (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139159#comment-15139159
 ] 

Zach Cox commented on FLINK-3375:
-

Like [~shikhar] I will also have event producers on different machines (with 
independent clocks) sending messages to the same Kafka topic partitions. So 
events from the same producer are ordered, but in general events in each 
partition are somewhat out-of-order. I have full control over these producers, 
and have considered having them emit periodic watermarks. Would be nice to also 
have the option for the FlinkKafkaConsumer to use watermarks embedded directly 
in the Kafka topic, instead of just trying to extract them from events.

> Allow Watermark Generation in the Kafka Source
> --
>
> Key: FLINK-3375
> URL: https://issues.apache.org/jira/browse/FLINK-3375
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
> Fix For: 1.0.0
>
>
> It is a common case that event timestamps are ascending inside one Kafka 
> Partition. Ascending timestamps are easy for users, because they are handles 
> by ascending timestamp extraction.
> If the Kafka source has multiple partitions per source task, then the records 
> become out of order before timestamps can be extracted and watermarks can be 
> generated.
> If we make the FlinkKafkaConsumer an event time source function, it can 
> generate watermarks itself. It would internally implement the same logic as 
> the regular operators that merge streams, keeping track of event time 
> progress per partition and generating watermarks based on the current 
> guaranteed event time progress.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3333) Documentation about object reuse should be improved

2016-02-09 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139151#comment-15139151
 ] 

Greg Hogan commented on FLINK-:
---

Apache Flink programs can be written and configured to reduce the number of 
object allocations for better performance. User defined functions (like map() 
or groupReduce()) process many millions or billions of input and output values. 
Enabling object reuse and processing mutable objects improves performance by 
lowering demand on the CPU cache and Java garbage collector.

Object reuse is disabled by default, with user defined functions generally 
getting new objects on each call (or through an iterator). In this case it is 
safe to store references to the objects inside the function (for example, in a 
List).

<'storing values in a list' example>

Apache Flink will chain functions to improve performance when sorting is 
preserved and the parallelism unchanged. The chainable operators are Map, 
FlatMap, Reduce on full DataSet, GroupCombine on a Grouped DataSet, or a 
GroupReduce where the user supplied a RichGroupReduceFunction with a combine 
method). Objects are passed without copying _even when object reuse is 
disabled_.

In the chaining case, the functions in the chain are receiving the same object 
instances. So the the second map() function is receiving the objects the first 
map() is returning. This behavior can lead to errors when the first map() 
function keeps a list of all objects and the second mapper is modifying 
objects. In that case, the user has to manually create copies of the objects 
before putting them into the list.







There is a switch at the ExecutionConfig which allows users to enable the 
object reuse mode (enableObjectReuse()). For mutable types, Flink will reuse 
object instances. In practice that means that a user function will always 
receive the same object instance (with its fields set to new values). The 
object reuse mode will lead to better performance because fewer objects are 
created, but the user has to manually take care of what they are doing with the 
object references.



> Documentation about object reuse should be improved
> ---
>
> Key: FLINK-
> URL: https://issues.apache.org/jira/browse/FLINK-
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Blocker
> Fix For: 1.0.0
>
>
> The documentation about object reuse \[1\] has several problems, see \[2\].
> \[1\] 
> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior
> \[2\] 
> https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3333) Documentation about object reuse should be improved

2016-02-09 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139150#comment-15139150
 ] 

Greg Hogan commented on FLINK-:
---

I think it is most important that the user documentation be clear and concise, 
otherwise it won't be read or will discourage new users. Whatever edge cases 
may lurk, object reuse appears to have worked well as users naturally write 
code.

It would be helpful to include a list of chainable operators (from 
DriverStrategy.java: Map, FlatMap, Reduce on full DataSet, GroupCombine on a 
Grouped DataSet, or a GroupReduce where the user supplied a 
RichGroupReduceFunction with a combine method). Those could be linked to the 
DataSet Transformations page.

Some discussion of CopyableValue types such as IntValue, LongValue, and 
StringValue would be nice, along with code samples.

I've typed up some edits and will paste into a new comment. I do think that 
discussion of user prohibitions (no modifying inputs to a filter function, no 
modifying keyed fields, ...) would be better placed in a separate section since 
those prohibitions are universally applicable.

> Documentation about object reuse should be improved
> ---
>
> Key: FLINK-
> URL: https://issues.apache.org/jira/browse/FLINK-
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Blocker
> Fix For: 1.0.0
>
>
> The documentation about object reuse \[1\] has several problems, see \[2\].
> \[1\] 
> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior
> \[2\] 
> https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: FLINK-3271: Include jetty-util in the dist jar

2016-02-09 Thread abhishekagarwal87
Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1543#discussion_r52330385
  
--- Diff: flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml ---
@@ -184,10 +184,10 @@ under the License.
org.mortbay.jetty
jsp-2.1

-   
+   

[jira] [Commented] (FLINK-3271) Using webhdfs in a flink topology throws classnotfound exception

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139132#comment-15139132
 ] 

ASF GitHub Bot commented on FLINK-3271:
---

Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1543#discussion_r52330385
  
--- Diff: flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml ---
@@ -184,10 +184,10 @@ under the License.
org.mortbay.jetty
jsp-2.1

-   
+   
> Key: FLINK-3271
> URL: https://issues.apache.org/jira/browse/FLINK-3271
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 0.10.1
>Reporter: Abhishek Agarwal
>Assignee: Abhishek Agarwal
>
> I was just trying to run a storm topology on flink using flink-storm. I got 
> this exception - 
> {noformat}
> Caused by: java.lang.NoClassDefFoundError: org/mortbay/util/ajax/JSON
>   at 
> org.apache.hadoop.hdfs.web.WebHdfsFileSystem.jsonParse(WebHdfsFileSystem.java:325)
>   at 
> org.apache.hadoop.hdfs.web.WebHdfsFileSystem$FsPathResponseRunner.getResponse(WebHdfsFileSystem.java:727)
>   at 
> org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.runWithRetry(WebHdfsFileSystem.java:610)
>   at 
> org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.access$100(WebHdfsFileSystem.java:458)
>   at 
> org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner$1.run(WebHdfsFileSystem.java:487)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
>   at 
> org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.run(WebHdfsFileSystem.java:483)
>   at 
> org.apache.hadoop.hdfs.web.WebHdfsFileSystem.listStatus(WebHdfsFileSystem.java:1277)
> {noformat}
> My topology list some files on hdfs using webhdfs API. 
> org.mortbay.util.ajax.JSON was included in the application uber jar. I 
> noticed that flink loads the application jar in a child classloader. This is 
> what most likely happened - 
> 1. WebHdfsFileSystem class was loaded through parent class loader since it is 
> included in flink-dist.jar.
> 2. WebHdfsFileSystem has reference to the org.mortbay.util.ajax.JSON but 
> since it is loaded through parent class loader, WebHdfsFileSystem can't read 
> a class in child class loader. 
> Ideally all the referenced classes should be available in the distribution 
> jar so that these sort of issues may not occur. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: FLINK-3271: Include jetty-util in the dist jar

2016-02-09 Thread abhishekagarwal87
Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1543#discussion_r52330359
  
--- Diff: flink-shaded-hadoop/pom.xml ---
@@ -111,6 +111,7 @@ under the License.

io.netty:netty:*

org.apache.curator:*

org.apache.hadoop:*
+   
org.mortbay.jetty:*
--- End diff --

It is a point solution only for webhdfs API. In code path of webhdfs API, 
hadoop classes call jetty-util classes. Since jetty-utils classes are not 
available in "flink class loader", call fails. Even if I pack jetty-util 
classes in my application jar, call still fails since application jar is in 
"different classloader". There are two ways to solve -
1. Relocate hadoop classes in flink fat jar. Now the webhdfs call will go 
through hadoop classes packed in application jar, and not the flink jar.
2. Include jetty-util classes in flink fat jar. This is what I am doing in 
this PR. 

Hope I have clarified the problem


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3271) Using webhdfs in a flink topology throws classnotfound exception

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139131#comment-15139131
 ] 

ASF GitHub Bot commented on FLINK-3271:
---

Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1543#discussion_r52330359
  
--- Diff: flink-shaded-hadoop/pom.xml ---
@@ -111,6 +111,7 @@ under the License.

io.netty:netty:*

org.apache.curator:*

org.apache.hadoop:*
+   
org.mortbay.jetty:*
--- End diff --

It is a point solution only for webhdfs API. In code path of webhdfs API, 
hadoop classes call jetty-util classes. Since jetty-utils classes are not 
available in "flink class loader", call fails. Even if I pack jetty-util 
classes in my application jar, call still fails since application jar is in 
"different classloader". There are two ways to solve -
1. Relocate hadoop classes in flink fat jar. Now the webhdfs call will go 
through hadoop classes packed in application jar, and not the flink jar.
2. Include jetty-util classes in flink fat jar. This is what I am doing in 
this PR. 

Hope I have clarified the problem


> Using webhdfs in a flink topology throws classnotfound exception
> 
>
> Key: FLINK-3271
> URL: https://issues.apache.org/jira/browse/FLINK-3271
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 0.10.1
>Reporter: Abhishek Agarwal
>Assignee: Abhishek Agarwal
>
> I was just trying to run a storm topology on flink using flink-storm. I got 
> this exception - 
> {noformat}
> Caused by: java.lang.NoClassDefFoundError: org/mortbay/util/ajax/JSON
>   at 
> org.apache.hadoop.hdfs.web.WebHdfsFileSystem.jsonParse(WebHdfsFileSystem.java:325)
>   at 
> org.apache.hadoop.hdfs.web.WebHdfsFileSystem$FsPathResponseRunner.getResponse(WebHdfsFileSystem.java:727)
>   at 
> org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.runWithRetry(WebHdfsFileSystem.java:610)
>   at 
> org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.access$100(WebHdfsFileSystem.java:458)
>   at 
> org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner$1.run(WebHdfsFileSystem.java:487)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
>   at 
> org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.run(WebHdfsFileSystem.java:483)
>   at 
> org.apache.hadoop.hdfs.web.WebHdfsFileSystem.listStatus(WebHdfsFileSystem.java:1277)
> {noformat}
> My topology list some files on hdfs using webhdfs API. 
> org.mortbay.util.ajax.JSON was included in the application uber jar. I 
> noticed that flink loads the application jar in a child classloader. This is 
> what most likely happened - 
> 1. WebHdfsFileSystem class was loaded through parent class loader since it is 
> included in flink-dist.jar.
> 2. WebHdfsFileSystem has reference to the org.mortbay.util.ajax.JSON but 
> since it is loaded through parent class loader, WebHdfsFileSystem can't read 
> a class in child class loader. 
> Ideally all the referenced classes should be available in the distribution 
> jar so that these sort of issues may not occur. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-3356) JobClientActorRecoveryITCase.testJobClientRecovery

2016-02-09 Thread Klou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139116#comment-15139116
 ] 

Klou edited comment on FLINK-3356 at 2/9/16 4:14 PM:
-

Although it does not refer to the same test, I post it here, as the reason 
seems to be related to timeouts. So here it goes, more unstable tests:

https://s3.amazonaws.com/archive.travis-ci.org/jobs/108034635/log.txt
https://s3.amazonaws.com/archive.travis-ci.org/jobs/108034634/log.txt


was (Author: kkl0u):
More unstable tests (timeouts seem to be the reason) 

https://s3.amazonaws.com/archive.travis-ci.org/jobs/108034635/log.txt
https://s3.amazonaws.com/archive.travis-ci.org/jobs/108034634/log.txt

> JobClientActorRecoveryITCase.testJobClientRecovery
> --
>
> Key: FLINK-3356
> URL: https://issues.apache.org/jira/browse/FLINK-3356
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: test-stability
>
> https://travis-ci.org/apache/flink/jobs/107597706
> https://travis-ci.org/mjsax/flink/jobs/107597700
> {noformat}
> Tests in error: 
>   JobClientActorRecoveryITCase.testJobClientRecovery:135 » Timeout Futures 
> timed...
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3105) Submission in per job YARN cluster mode reuses properties file of long-lived session

2016-02-09 Thread Ufuk Celebi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139129#comment-15139129
 ] 

Ufuk Celebi commented on FLINK-3105:


Do we want to fix this for 1.0?

> Submission in per job YARN cluster mode reuses properties file of long-lived 
> session
> 
>
> Key: FLINK-3105
> URL: https://issues.apache.org/jira/browse/FLINK-3105
> Project: Flink
>  Issue Type: Bug
>  Components: YARN Client
>Affects Versions: 0.10.1
>Reporter: Ufuk Celebi
>
> Starting a YARN session with `bin/yarn-session.sh` creates a properties file, 
> which is used to parse job manager information when submitting jobs.
> This properties file is also used when submitting a job with {{bin/flink run 
> -m yarn-cluster}}. The {{yarn-cluster}} mode should actually start a new YARN 
> session.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3105) Submission in per job YARN cluster mode reuses properties file of long-lived session

2016-02-09 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi updated FLINK-3105:
---
Fix Version/s: (was: 0.10.2)

> Submission in per job YARN cluster mode reuses properties file of long-lived 
> session
> 
>
> Key: FLINK-3105
> URL: https://issues.apache.org/jira/browse/FLINK-3105
> Project: Flink
>  Issue Type: Bug
>  Components: YARN Client
>Affects Versions: 0.10.1
>Reporter: Ufuk Celebi
>
> Starting a YARN session with `bin/yarn-session.sh` creates a properties file, 
> which is used to parse job manager information when submitting jobs.
> This properties file is also used when submitting a job with {{bin/flink run 
> -m yarn-cluster}}. The {{yarn-cluster}} mode should actually start a new YARN 
> session.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Increase test timeouts

2016-02-09 Thread uce
GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/1611

Increase test timeouts

- Increases Akka ask timeout in test clusters. Because there is no clear 
hierarchy with these, I added a `setDefaultCiConfig(Configuration` method to 
`FlinkMiniCluster`, which is called in the 
`generateConfiguration(Configuration)` of all sub types.
- Increases the ZooKeeper connection timeouts
- Logs failures in the retry rule on warn level instead of debug
- Fixes a test instability in `JobManagerCheckpointRecoveryITCase`

Running 10 builds with these changes, there were still some failures 
(mostly process test failures). Another new issue seems to be that we are 
started hitting the 2 h build limit occasionally.

We will still have to monitor the build stability.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/flink test-stability

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1611.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1611


commit 5c062101d487f0950ae1b22eb4166bb2564d67e2
Author: Ufuk Celebi 
Date:   2016-02-08T13:24:43Z

[runtime, streaming-connectors, tests] Increase default test Akka ask and 
ZooKeeper timeouts

commit 73b6af7f5565576d440372f6650d467eae008b99
Author: Ufuk Celebi 
Date:   2016-02-09T10:01:39Z

[runtime, tests] Ignore ZooKeeper logs in process tests

commit 0854ff7decf3069f13489fcc8b6f88aec5fc5d94
Author: Ufuk Celebi 
Date:   2016-02-09T10:25:37Z

[core] Log retry rule failures on warn level

commit 1dc2b61ac93d29da47dbc98025493a052b1627fd
Author: Ufuk Celebi 
Date:   2016-02-09T11:45:41Z

[tests] Reset state to allow retry on failure




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3356) JobClientActorRecoveryITCase.testJobClientRecovery

2016-02-09 Thread Klou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139116#comment-15139116
 ] 

Klou commented on FLINK-3356:
-

More unstable tests (timeouts seem to be the reason) 

https://s3.amazonaws.com/archive.travis-ci.org/jobs/108034635/log.txt
https://s3.amazonaws.com/archive.travis-ci.org/jobs/108034634/log.txt

> JobClientActorRecoveryITCase.testJobClientRecovery
> --
>
> Key: FLINK-3356
> URL: https://issues.apache.org/jira/browse/FLINK-3356
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: test-stability
>
> https://travis-ci.org/apache/flink/jobs/107597706
> https://travis-ci.org/mjsax/flink/jobs/107597700
> {noformat}
> Tests in error: 
>   JobClientActorRecoveryITCase.testJobClientRecovery:135 » Timeout Futures 
> timed...
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3271) Using webhdfs in a flink topology throws classnotfound exception

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139114#comment-15139114
 ] 

ASF GitHub Bot commented on FLINK-3271:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1543#discussion_r52328751
  
--- Diff: flink-shaded-hadoop/pom.xml ---
@@ -111,6 +111,7 @@ under the License.

io.netty:netty:*

org.apache.curator:*

org.apache.hadoop:*
+   
org.mortbay.jetty:*
--- End diff --

This PR does not change that the classes are not hidden. It only puts them 
into the prepared Hadoop dependency for Flink (which later goes into the Flink 
fat jar).


> Using webhdfs in a flink topology throws classnotfound exception
> 
>
> Key: FLINK-3271
> URL: https://issues.apache.org/jira/browse/FLINK-3271
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 0.10.1
>Reporter: Abhishek Agarwal
>Assignee: Abhishek Agarwal
>
> I was just trying to run a storm topology on flink using flink-storm. I got 
> this exception - 
> {noformat}
> Caused by: java.lang.NoClassDefFoundError: org/mortbay/util/ajax/JSON
>   at 
> org.apache.hadoop.hdfs.web.WebHdfsFileSystem.jsonParse(WebHdfsFileSystem.java:325)
>   at 
> org.apache.hadoop.hdfs.web.WebHdfsFileSystem$FsPathResponseRunner.getResponse(WebHdfsFileSystem.java:727)
>   at 
> org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.runWithRetry(WebHdfsFileSystem.java:610)
>   at 
> org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.access$100(WebHdfsFileSystem.java:458)
>   at 
> org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner$1.run(WebHdfsFileSystem.java:487)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
>   at 
> org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.run(WebHdfsFileSystem.java:483)
>   at 
> org.apache.hadoop.hdfs.web.WebHdfsFileSystem.listStatus(WebHdfsFileSystem.java:1277)
> {noformat}
> My topology list some files on hdfs using webhdfs API. 
> org.mortbay.util.ajax.JSON was included in the application uber jar. I 
> noticed that flink loads the application jar in a child classloader. This is 
> what most likely happened - 
> 1. WebHdfsFileSystem class was loaded through parent class loader since it is 
> included in flink-dist.jar.
> 2. WebHdfsFileSystem has reference to the org.mortbay.util.ajax.JSON but 
> since it is loaded through parent class loader, WebHdfsFileSystem can't read 
> a class in child class loader. 
> Ideally all the referenced classes should be available in the distribution 
> jar so that these sort of issues may not occur. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: FLINK-3271: Include jetty-util in the dist jar

2016-02-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1543#discussion_r52328751
  
--- Diff: flink-shaded-hadoop/pom.xml ---
@@ -111,6 +111,7 @@ under the License.

io.netty:netty:*

org.apache.curator:*

org.apache.hadoop:*
+   
org.mortbay.jetty:*
--- End diff --

This PR does not change that the classes are not hidden. It only puts them 
into the prepared Hadoop dependency for Flink (which later goes into the Flink 
fat jar).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3271) Using webhdfs in a flink topology throws classnotfound exception

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139108#comment-15139108
 ] 

ASF GitHub Bot commented on FLINK-3271:
---

Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1543#discussion_r52327591
  
--- Diff: flink-shaded-hadoop/pom.xml ---
@@ -111,6 +111,7 @@ under the License.

io.netty:netty:*

org.apache.curator:*

org.apache.hadoop:*
+   
org.mortbay.jetty:*
--- End diff --

Hadoop classes are being included in the fat jar but are not being hidden. 
That leads to the error scenario, this PR intends to solve. 


> Using webhdfs in a flink topology throws classnotfound exception
> 
>
> Key: FLINK-3271
> URL: https://issues.apache.org/jira/browse/FLINK-3271
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 0.10.1
>Reporter: Abhishek Agarwal
>Assignee: Abhishek Agarwal
>
> I was just trying to run a storm topology on flink using flink-storm. I got 
> this exception - 
> {noformat}
> Caused by: java.lang.NoClassDefFoundError: org/mortbay/util/ajax/JSON
>   at 
> org.apache.hadoop.hdfs.web.WebHdfsFileSystem.jsonParse(WebHdfsFileSystem.java:325)
>   at 
> org.apache.hadoop.hdfs.web.WebHdfsFileSystem$FsPathResponseRunner.getResponse(WebHdfsFileSystem.java:727)
>   at 
> org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.runWithRetry(WebHdfsFileSystem.java:610)
>   at 
> org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.access$100(WebHdfsFileSystem.java:458)
>   at 
> org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner$1.run(WebHdfsFileSystem.java:487)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
>   at 
> org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.run(WebHdfsFileSystem.java:483)
>   at 
> org.apache.hadoop.hdfs.web.WebHdfsFileSystem.listStatus(WebHdfsFileSystem.java:1277)
> {noformat}
> My topology list some files on hdfs using webhdfs API. 
> org.mortbay.util.ajax.JSON was included in the application uber jar. I 
> noticed that flink loads the application jar in a child classloader. This is 
> what most likely happened - 
> 1. WebHdfsFileSystem class was loaded through parent class loader since it is 
> included in flink-dist.jar.
> 2. WebHdfsFileSystem has reference to the org.mortbay.util.ajax.JSON but 
> since it is loaded through parent class loader, WebHdfsFileSystem can't read 
> a class in child class loader. 
> Ideally all the referenced classes should be available in the distribution 
> jar so that these sort of issues may not occur. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: FLINK-3271: Include jetty-util in the dist jar

2016-02-09 Thread abhishekagarwal87
Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1543#discussion_r52327591
  
--- Diff: flink-shaded-hadoop/pom.xml ---
@@ -111,6 +111,7 @@ under the License.

io.netty:netty:*

org.apache.curator:*

org.apache.hadoop:*
+   
org.mortbay.jetty:*
--- End diff --

Hadoop classes are being included in the fat jar but are not being hidden. 
That leads to the error scenario, this PR intends to solve. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   >