[jira] [Commented] (FLINK-1670) Collect method for streaming

2015-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14522195#comment-14522195
 ] 

ASF GitHub Bot commented on FLINK-1670:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/581#issuecomment-97956542
  
I think this looks good now !

I think this needs a test (integration test), otherwise it probably gets 
broken by some change soon.
Starting a `ForkableFlinkMiniCluster` and then submit the job via the 
`RemoteStreamEnvironment` and collect the result back should cover it.


> Collect method for streaming
> 
>
> Key: FLINK-1670
> URL: https://issues.apache.org/jira/browse/FLINK-1670
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 0.9
>Reporter: Márton Balassi
>Assignee: Gabor Gevay
>Priority: Minor
>
> A convenience method for streaming back the results of a job to the client.
> As the client itself is a bottleneck anyway an easy solution would be to 
> provide a socket sink with degree of parallelism 1, from which a client 
> utility can read.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1670] Made DataStream iterable

2015-04-30 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/581#issuecomment-97956542
  
I think this looks good now !

I think this needs a test (integration test), otherwise it probably gets 
broken by some change soon.
Starting a `ForkableFlinkMiniCluster` and then submit the job via the 
`RemoteStreamEnvironment` and collect the result back should cover it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [streaming] New Source and state checkpointing...

2015-04-30 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/643

[streaming] New Source and state checkpointing interfaces 

These interfaces allow streaming sources/operators to interact with the 
state checkpointing in a more precise manner.

The interfaces are so far intended mainly for internal state handling 
(sources, window operators).

An open question what state interface we want to expose to the user code 
such that, on scale-in and scale-out, the state repartitions consistently with 
the data partitions.
  - For groupByKey operations, we could scope the state by key.
  - For all other operations, state may be repartitioned in a user-defined 
manner. This would give no guarantees about alignment with partitions. Think of 
a pipeline `partionBy -> map()`. The state accumulated in the `map()` function 
is partitioned by a key, but the state entries may not be able to be linked to 
a certain key or key-hash value.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink cp_interfaces

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/643.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #643


commit 76d63d5b03643d3c6e51a882eb9427e421806342
Author: Stephan Ewen 
Date:   2015-04-30T20:05:27Z

[streaming] New Source and state checkpointing interfaces that allow 
operations to interact with the state checkpointing in a more precise manner.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1818] Added api to cancel job from clie...

2015-04-30 Thread hsaputra
Github user hsaputra commented on a diff in the pull request:

https://github.com/apache/flink/pull/642#discussion_r29462715
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---
@@ -420,6 +428,60 @@ public JobSubmissionResult run(JobGraph jobGraph, 
boolean wait) throws ProgramIn
}
}
 
+
+   /**
+* Executes the CANCEL action.
+*
+* @param args Command line arguments for the cancel action.
+*/
+
+
--- End diff --

Nit: Extra newlines


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1818) Provide API to cancel running job

2015-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14522117#comment-14522117
 ] 

ASF GitHub Bot commented on FLINK-1818:
---

Github user hsaputra commented on a diff in the pull request:

https://github.com/apache/flink/pull/642#discussion_r29462715
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---
@@ -420,6 +428,60 @@ public JobSubmissionResult run(JobGraph jobGraph, 
boolean wait) throws ProgramIn
}
}
 
+
+   /**
+* Executes the CANCEL action.
+*
+* @param args Command line arguments for the cancel action.
+*/
+
+
--- End diff --

Nit: Extra newlines


> Provide API to cancel running job
> -
>
> Key: FLINK-1818
> URL: https://issues.apache.org/jira/browse/FLINK-1818
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: niraj rai
>  Labels: starter
>
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Canceling-a-Cluster-Job-from-Java-td4897.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1818) Provide API to cancel running job

2015-04-30 Thread niraj rai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521937#comment-14521937
 ] 

niraj rai commented on FLINK-1818:
--

Hi [~mxm], Thanks for reviewing it. I will incorporate the changes suggested by 
you and resubmit. Thanks again.


> Provide API to cancel running job
> -
>
> Key: FLINK-1818
> URL: https://issues.apache.org/jira/browse/FLINK-1818
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: niraj rai
>  Labels: starter
>
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Canceling-a-Cluster-Job-from-Java-td4897.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1818) Provide API to cancel running job

2015-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521849#comment-14521849
 ] 

ASF GitHub Bot commented on FLINK-1818:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/642#issuecomment-97882155
  
Thank you for your pull request @rainiraj. I have made some comments. If 
you addressed these we would be happy to merge your changes. Feel free to ask 
if anything is unclear.


> Provide API to cancel running job
> -
>
> Key: FLINK-1818
> URL: https://issues.apache.org/jira/browse/FLINK-1818
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: niraj rai
>  Labels: starter
>
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Canceling-a-Cluster-Job-from-Java-td4897.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1818] Added api to cancel job from clie...

2015-04-30 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/642#issuecomment-97882155
  
Thank you for your pull request @rainiraj. I have made some comments. If 
you addressed these we would be happy to merge your changes. Feel free to ask 
if anything is unclear.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1818) Provide API to cancel running job

2015-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521839#comment-14521839
 ] 

ASF GitHub Bot commented on FLINK-1818:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/642#discussion_r29447974
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---
@@ -420,6 +428,60 @@ public JobSubmissionResult run(JobGraph jobGraph, 
boolean wait) throws ProgramIn
}
}
 
+
+   /**
+* Executes the CANCEL action.
+*
+* @param args Command line arguments for the cancel action.
+*/
+
+
+   protected int cancel(JobID jobId){
+   LOG.info("Executing 'cancel' command.");
+
+   final FiniteDuration timeout = 
AkkaUtils.getTimeout(configuration);
+
+   try {
+   String address = 
configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+   Option> remoting =
+   new Some>(new 
Tuple2("", 0));
+
+   // start a remote actor system to listen on an 
arbitrary port
+   ActorSystem actorSystem = 
AkkaUtils.createActorSystem(configuration, remoting);
+
+
+   ActorRef jobManager = 
JobManager.getJobManagerRemoteReference(address, actorSystem, timeout);
+   Future response = Patterns.ask(jobManager, new 
JobManagerMessages.CancelJob(jobId), new Timeout(AkkaUtils.INF_TIMEOUT()));
+
+   try {
+   Await.result(response, timeout);
+   return 0;
+   }
+   catch (Exception e) {
+   throw new Exception("Canceling the job with ID 
" + jobId + " failed.", e);
+   }
+   }
+   catch (Throwable t) {
+   return handleError(t);
+   }
--- End diff --

I would handle the error explicitly here and print a stacktrace and info 
message. Creating a new method is not necessary because it is not used 
elsewhere. You might want to return a boolean which returns true if the 
cancellation succeeded (CancellationSuccess received) or failed (otherwise).


> Provide API to cancel running job
> -
>
> Key: FLINK-1818
> URL: https://issues.apache.org/jira/browse/FLINK-1818
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: niraj rai
>  Labels: starter
>
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Canceling-a-Cluster-Job-from-Java-td4897.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1818] Added api to cancel job from clie...

2015-04-30 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/642#discussion_r29447974
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---
@@ -420,6 +428,60 @@ public JobSubmissionResult run(JobGraph jobGraph, 
boolean wait) throws ProgramIn
}
}
 
+
+   /**
+* Executes the CANCEL action.
+*
+* @param args Command line arguments for the cancel action.
+*/
+
+
+   protected int cancel(JobID jobId){
+   LOG.info("Executing 'cancel' command.");
+
+   final FiniteDuration timeout = 
AkkaUtils.getTimeout(configuration);
+
+   try {
+   String address = 
configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+   Option> remoting =
+   new Some>(new 
Tuple2("", 0));
+
+   // start a remote actor system to listen on an 
arbitrary port
+   ActorSystem actorSystem = 
AkkaUtils.createActorSystem(configuration, remoting);
+
+
+   ActorRef jobManager = 
JobManager.getJobManagerRemoteReference(address, actorSystem, timeout);
+   Future response = Patterns.ask(jobManager, new 
JobManagerMessages.CancelJob(jobId), new Timeout(AkkaUtils.INF_TIMEOUT()));
+
+   try {
+   Await.result(response, timeout);
+   return 0;
+   }
+   catch (Exception e) {
+   throw new Exception("Canceling the job with ID 
" + jobId + " failed.", e);
+   }
+   }
+   catch (Throwable t) {
+   return handleError(t);
+   }
--- End diff --

I would handle the error explicitly here and print a stacktrace and info 
message. Creating a new method is not necessary because it is not used 
elsewhere. You might want to return a boolean which returns true if the 
cancellation succeeded (CancellationSuccess received) or failed (otherwise).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1818] Added api to cancel job from clie...

2015-04-30 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/642#discussion_r29447805
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---
@@ -420,6 +428,60 @@ public JobSubmissionResult run(JobGraph jobGraph, 
boolean wait) throws ProgramIn
}
}
 
+
+   /**
+* Executes the CANCEL action.
+*
+* @param args Command line arguments for the cancel action.
+*/
+
+
+   protected int cancel(JobID jobId){
+   LOG.info("Executing 'cancel' command.");
+
+   final FiniteDuration timeout = 
AkkaUtils.getTimeout(configuration);
+
+   try {
+   String address = 
configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+   Option> remoting =
+   new Some>(new 
Tuple2("", 0));
+
+   // start a remote actor system to listen on an 
arbitrary port
+   ActorSystem actorSystem = 
AkkaUtils.createActorSystem(configuration, remoting);
+
+
+   ActorRef jobManager = 
JobManager.getJobManagerRemoteReference(address, actorSystem, timeout);
+   Future response = Patterns.ask(jobManager, new 
JobManagerMessages.CancelJob(jobId), new Timeout(AkkaUtils.INF_TIMEOUT()));
+
+   try {
+   Await.result(response, timeout);
--- End diff --

You should checked the result here. `Await.result` returns an Object which 
is either `CancellationSuccess` or `CancellationFailure`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1818] Added api to cancel job from clie...

2015-04-30 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/642#discussion_r29447791
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---
@@ -420,6 +428,60 @@ public JobSubmissionResult run(JobGraph jobGraph, 
boolean wait) throws ProgramIn
}
}
 
+
+   /**
+* Executes the CANCEL action.
+*
+* @param args Command line arguments for the cancel action.
+*/
+
+
+   protected int cancel(JobID jobId){
+   LOG.info("Executing 'cancel' command.");
+
+   final FiniteDuration timeout = 
AkkaUtils.getTimeout(configuration);
+
+   try {
+   String address = 
configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+   Option> remoting =
+   new Some>(new 
Tuple2("", 0));
+
+   // start a remote actor system to listen on an 
arbitrary port
+   ActorSystem actorSystem = 
AkkaUtils.createActorSystem(configuration, remoting);
+
+
+   ActorRef jobManager = 
JobManager.getJobManagerRemoteReference(address, actorSystem, timeout);
+   Future response = Patterns.ask(jobManager, new 
JobManagerMessages.CancelJob(jobId), new Timeout(AkkaUtils.INF_TIMEOUT()));
--- End diff --

You should also specify a finite timeout here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1818) Provide API to cancel running job

2015-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521829#comment-14521829
 ] 

ASF GitHub Bot commented on FLINK-1818:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/642#discussion_r29447791
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---
@@ -420,6 +428,60 @@ public JobSubmissionResult run(JobGraph jobGraph, 
boolean wait) throws ProgramIn
}
}
 
+
+   /**
+* Executes the CANCEL action.
+*
+* @param args Command line arguments for the cancel action.
+*/
+
+
+   protected int cancel(JobID jobId){
+   LOG.info("Executing 'cancel' command.");
+
+   final FiniteDuration timeout = 
AkkaUtils.getTimeout(configuration);
+
+   try {
+   String address = 
configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+   Option> remoting =
+   new Some>(new 
Tuple2("", 0));
+
+   // start a remote actor system to listen on an 
arbitrary port
+   ActorSystem actorSystem = 
AkkaUtils.createActorSystem(configuration, remoting);
+
+
+   ActorRef jobManager = 
JobManager.getJobManagerRemoteReference(address, actorSystem, timeout);
+   Future response = Patterns.ask(jobManager, new 
JobManagerMessages.CancelJob(jobId), new Timeout(AkkaUtils.INF_TIMEOUT()));
--- End diff --

You should also specify a finite timeout here.


> Provide API to cancel running job
> -
>
> Key: FLINK-1818
> URL: https://issues.apache.org/jira/browse/FLINK-1818
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: niraj rai
>  Labels: starter
>
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Canceling-a-Cluster-Job-from-Java-td4897.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1818) Provide API to cancel running job

2015-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521830#comment-14521830
 ] 

ASF GitHub Bot commented on FLINK-1818:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/642#discussion_r29447805
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---
@@ -420,6 +428,60 @@ public JobSubmissionResult run(JobGraph jobGraph, 
boolean wait) throws ProgramIn
}
}
 
+
+   /**
+* Executes the CANCEL action.
+*
+* @param args Command line arguments for the cancel action.
+*/
+
+
+   protected int cancel(JobID jobId){
+   LOG.info("Executing 'cancel' command.");
+
+   final FiniteDuration timeout = 
AkkaUtils.getTimeout(configuration);
+
+   try {
+   String address = 
configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+   Option> remoting =
+   new Some>(new 
Tuple2("", 0));
+
+   // start a remote actor system to listen on an 
arbitrary port
+   ActorSystem actorSystem = 
AkkaUtils.createActorSystem(configuration, remoting);
+
+
+   ActorRef jobManager = 
JobManager.getJobManagerRemoteReference(address, actorSystem, timeout);
+   Future response = Patterns.ask(jobManager, new 
JobManagerMessages.CancelJob(jobId), new Timeout(AkkaUtils.INF_TIMEOUT()));
+
+   try {
+   Await.result(response, timeout);
--- End diff --

You should checked the result here. `Await.result` returns an Object which 
is either `CancellationSuccess` or `CancellationFailure`.


> Provide API to cancel running job
> -
>
> Key: FLINK-1818
> URL: https://issues.apache.org/jira/browse/FLINK-1818
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: niraj rai
>  Labels: starter
>
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Canceling-a-Cluster-Job-from-Java-td4897.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1818) Provide API to cancel running job

2015-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521828#comment-14521828
 ] 

ASF GitHub Bot commented on FLINK-1818:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/642#discussion_r29447781
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---
@@ -420,6 +428,60 @@ public JobSubmissionResult run(JobGraph jobGraph, 
boolean wait) throws ProgramIn
}
}
 
+
+   /**
+* Executes the CANCEL action.
+*
+* @param args Command line arguments for the cancel action.
+*/
+
+
+   protected int cancel(JobID jobId){
+   LOG.info("Executing 'cancel' command.");
+
+   final FiniteDuration timeout = 
AkkaUtils.getTimeout(configuration);
+
+   try {
+   String address = 
configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+   Option> remoting =
+   new Some>(new 
Tuple2("", 0));
+
+   // start a remote actor system to listen on an 
arbitrary port
+   ActorSystem actorSystem = 
AkkaUtils.createActorSystem(configuration, remoting);
--- End diff --

You could have used `JobClient.startJobClientActorSystem(configuration)` to 
create the ActorSystem. That makes the above lines unnecessary.


> Provide API to cancel running job
> -
>
> Key: FLINK-1818
> URL: https://issues.apache.org/jira/browse/FLINK-1818
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: niraj rai
>  Labels: starter
>
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Canceling-a-Cluster-Job-from-Java-td4897.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1818] Added api to cancel job from clie...

2015-04-30 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/642#discussion_r29447781
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---
@@ -420,6 +428,60 @@ public JobSubmissionResult run(JobGraph jobGraph, 
boolean wait) throws ProgramIn
}
}
 
+
+   /**
+* Executes the CANCEL action.
+*
+* @param args Command line arguments for the cancel action.
+*/
+
+
+   protected int cancel(JobID jobId){
+   LOG.info("Executing 'cancel' command.");
+
+   final FiniteDuration timeout = 
AkkaUtils.getTimeout(configuration);
+
+   try {
+   String address = 
configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+   Option> remoting =
+   new Some>(new 
Tuple2("", 0));
+
+   // start a remote actor system to listen on an 
arbitrary port
+   ActorSystem actorSystem = 
AkkaUtils.createActorSystem(configuration, remoting);
--- End diff --

You could have used `JobClient.startJobClientActorSystem(configuration)` to 
create the ActorSystem. That makes the above lines unnecessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1818] Added api to cancel job from clie...

2015-04-30 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/642#discussion_r29447739
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---
@@ -420,6 +428,60 @@ public JobSubmissionResult run(JobGraph jobGraph, 
boolean wait) throws ProgramIn
}
}
 
+
+   /**
+* Executes the CANCEL action.
+*
+* @param args Command line arguments for the cancel action.
--- End diff --

The JavaDoc needs to be corrected.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1818) Provide API to cancel running job

2015-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521826#comment-14521826
 ] 

ASF GitHub Bot commented on FLINK-1818:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/642#discussion_r29447739
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---
@@ -420,6 +428,60 @@ public JobSubmissionResult run(JobGraph jobGraph, 
boolean wait) throws ProgramIn
}
}
 
+
+   /**
+* Executes the CANCEL action.
+*
+* @param args Command line arguments for the cancel action.
--- End diff --

The JavaDoc needs to be corrected.


> Provide API to cancel running job
> -
>
> Key: FLINK-1818
> URL: https://issues.apache.org/jira/browse/FLINK-1818
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: niraj rai
>  Labels: starter
>
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Canceling-a-Cluster-Job-from-Java-td4897.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1690) ProcessFailureBatchRecoveryITCase.testTaskManagerProcessFailure spuriously fails on Travis

2015-04-30 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521736#comment-14521736
 ] 

Stephan Ewen commented on FLINK-1690:
-

This one is interesting, it is not Test specific, it is a vulnerability that 
all tests have: A TaskManager port conflict.

Because the port is chosen not by netty, but beforehand (randomly), two 
TaskManagers can attempt to open the same port (with a small probability).

> ProcessFailureBatchRecoveryITCase.testTaskManagerProcessFailure spuriously 
> fails on Travis
> --
>
> Key: FLINK-1690
> URL: https://issues.apache.org/jira/browse/FLINK-1690
> Project: Flink
>  Issue Type: Bug
>Reporter: Till Rohrmann
>Priority: Minor
>
> I got the following error on Travis.
> {code}
> ProcessFailureBatchRecoveryITCase.testTaskManagerProcessFailure:244 The 
> program did not finish in time
> {code}
> I think we have to increase the timeouts for this test case to make it 
> reliably run on Travis.
> The log of the failed Travis build can be found 
> [here|https://api.travis-ci.org/jobs/53952486/log.txt?deansi=true]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1818) Provide API to cancel running job

2015-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521681#comment-14521681
 ] 

ASF GitHub Bot commented on FLINK-1818:
---

GitHub user rainiraj opened a pull request:

https://github.com/apache/flink/pull/642

[FLINK-1818] Added api to cancel job from client

Please review the implementation of cancel command through client API.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rainiraj/flink clientapi

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/642.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #642


commit 26b59fcc0b0d49bb8942da22267ac12e963fa4f5
Author: rainiraj 
Date:   2015-04-30T15:28:29Z

[FLINK-1818] Added api to cancel job from client




> Provide API to cancel running job
> -
>
> Key: FLINK-1818
> URL: https://issues.apache.org/jira/browse/FLINK-1818
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: niraj rai
>  Labels: starter
>
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Canceling-a-Cluster-Job-from-Java-td4897.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1818] Added api to cancel job from clie...

2015-04-30 Thread rainiraj
GitHub user rainiraj opened a pull request:

https://github.com/apache/flink/pull/642

[FLINK-1818] Added api to cancel job from client

Please review the implementation of cancel command through client API.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rainiraj/flink clientapi

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/642.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #642


commit 26b59fcc0b0d49bb8942da22267ac12e963fa4f5
Author: rainiraj 
Date:   2015-04-30T15:28:29Z

[FLINK-1818] Added api to cancel job from client




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1951) NullPointerException in DeltaIteration when no ForwardedFileds

2015-04-30 Thread Vasia Kalavri (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521678#comment-14521678
 ] 

Vasia Kalavri commented on FLINK-1951:
--

wow thanks a lot for the nice explanation and fix [~fhueske] ^^

> NullPointerException in DeltaIteration when no ForwardedFileds
> --
>
> Key: FLINK-1951
> URL: https://issues.apache.org/jira/browse/FLINK-1951
> Project: Flink
>  Issue Type: Bug
>  Components: Iterations
>Affects Versions: 0.9
>Reporter: Vasia Kalavri
>Assignee: Fabian Hueske
>Priority: Critical
>
> The following exception is thrown by the Connected Components example, if the 
> @ForwardedFieldsFirst("*") annotation from the ComponentIdFilter join is 
> removed:
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.examples.java.graph.ConnectedComponents$ComponentIdFilter.join(ConnectedComponents.java:186)
>   at 
> org.apache.flink.examples.java.graph.ConnectedComponents$ComponentIdFilter.join(ConnectedComponents.java:1)
>   at 
> org.apache.flink.runtime.operators.JoinWithSolutionSetSecondDriver.run(JoinWithSolutionSetSecondDriver.java:198)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
>   at 
> org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:139)
>   at 
> org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask.run(IterationIntermediatePactTask.java:92)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>   at 
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
>   at java.lang.Thread.run(Thread.java:745)
> [Code | https://github.com/vasia/flink/tree/cc-test] and [dataset | 
> http://snap.stanford.edu/data/com-DBLP.html] to reproduce.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1690) ProcessFailureBatchRecoveryITCase.testTaskManagerProcessFailure spuriously fails on Travis

2015-04-30 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521669#comment-14521669
 ] 

Robert Metzger commented on FLINK-1690:
---

Another instance https://travis-ci.org/apache/flink/jobs/60670489

> ProcessFailureBatchRecoveryITCase.testTaskManagerProcessFailure spuriously 
> fails on Travis
> --
>
> Key: FLINK-1690
> URL: https://issues.apache.org/jira/browse/FLINK-1690
> Project: Flink
>  Issue Type: Bug
>Reporter: Till Rohrmann
>Priority: Minor
>
> I got the following error on Travis.
> {code}
> ProcessFailureBatchRecoveryITCase.testTaskManagerProcessFailure:244 The 
> program did not finish in time
> {code}
> I think we have to increase the timeouts for this test case to make it 
> reliably run on Travis.
> The log of the failed Travis build can be found 
> [here|https://api.travis-ci.org/jobs/53952486/log.txt?deansi=true]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1951) NullPointerException in DeltaIteration when no ForwardedFileds

2015-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521663#comment-14521663
 ] 

ASF GitHub Bot commented on FLINK-1951:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/641#issuecomment-97842442
  
Looks good ;)


> NullPointerException in DeltaIteration when no ForwardedFileds
> --
>
> Key: FLINK-1951
> URL: https://issues.apache.org/jira/browse/FLINK-1951
> Project: Flink
>  Issue Type: Bug
>  Components: Iterations
>Affects Versions: 0.9
>Reporter: Vasia Kalavri
>Assignee: Fabian Hueske
>Priority: Critical
>
> The following exception is thrown by the Connected Components example, if the 
> @ForwardedFieldsFirst("*") annotation from the ComponentIdFilter join is 
> removed:
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.examples.java.graph.ConnectedComponents$ComponentIdFilter.join(ConnectedComponents.java:186)
>   at 
> org.apache.flink.examples.java.graph.ConnectedComponents$ComponentIdFilter.join(ConnectedComponents.java:1)
>   at 
> org.apache.flink.runtime.operators.JoinWithSolutionSetSecondDriver.run(JoinWithSolutionSetSecondDriver.java:198)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
>   at 
> org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:139)
>   at 
> org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask.run(IterationIntermediatePactTask.java:92)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>   at 
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
>   at java.lang.Thread.run(Thread.java:745)
> [Code | https://github.com/vasia/flink/tree/cc-test] and [dataset | 
> http://snap.stanford.edu/data/com-DBLP.html] to reproduce.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-1965) Implement the Orthant-wise Limited Memory QuasiNewton optimization algorithm, a variant of L-BFGS that handles L1 regularization

2015-04-30 Thread Theodore Vasiloudis (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Theodore Vasiloudis updated FLINK-1965:
---
Description: 
The Orthant-wise Limited Memory QuasiNewton (OWL-QN) is a quasi-Newton 
optimization method similar to L-BFGS that can handle L1 regularization. 

Implementing this would allow us to obtain sparse solutions while at the same 
time having the convergence benefits of a quasi-Newton method, when compared to 
stochastic gradient descent.

[Link to 
paper|http://research-srv.microsoft.com/en-us/um/people/jfgao/paper/icml07scalable.pdf]
[Link to example 
implementation|http://research.microsoft.com/en-us/downloads/b1eb1016-1738-4bd5-83a9-370c9d498a03/]


  was:
The Orthant-wise Limited Memory QuasiNewton (OWL-QN) is a quasi-Newton 
optimization method similar to L-BFGS that can handle L1 regularization. 

Implementing this would allow us to obtain sparse solutions while at the same 
time having the convergence benefits of a quasi-Newton method, when compared to 
stochastic gradient descent.

[Link to 
paper|http://research.microsoft.com/en-us/downloads/b1eb1016-1738-4bd5-83a9-370c9d498a03/]



> Implement the Orthant-wise Limited Memory QuasiNewton optimization algorithm, 
> a variant of L-BFGS that handles L1 regularization
> 
>
> Key: FLINK-1965
> URL: https://issues.apache.org/jira/browse/FLINK-1965
> Project: Flink
>  Issue Type: Wish
>  Components: Machine Learning Library
>Reporter: Theodore Vasiloudis
>Priority: Minor
>  Labels: ML
>
> The Orthant-wise Limited Memory QuasiNewton (OWL-QN) is a quasi-Newton 
> optimization method similar to L-BFGS that can handle L1 regularization. 
> Implementing this would allow us to obtain sparse solutions while at the same 
> time having the convergence benefits of a quasi-Newton method, when compared 
> to stochastic gradient descent.
> [Link to 
> paper|http://research-srv.microsoft.com/en-us/um/people/jfgao/paper/icml07scalable.pdf]
> [Link to example 
> implementation|http://research.microsoft.com/en-us/downloads/b1eb1016-1738-4bd5-83a9-370c9d498a03/]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1951] Fix NullPointerException in delta...

2015-04-30 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/641#issuecomment-97842442
  
Looks good ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1951) NullPointerException in DeltaIteration when no ForwardedFileds

2015-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521661#comment-14521661
 ] 

ASF GitHub Bot commented on FLINK-1951:
---

GitHub user fhueske opened a pull request:

https://github.com/apache/flink/pull/641

[FLINK-1951] Fix NullPointerException in delta iteration due to missing 
input temp

See JIRA for details.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhueske/flink delta-npe-fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/641.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #641


commit 0c7ad4b03e3a47d5c7ee95659a31e62d89edfac6
Author: Fabian Hueske 
Date:   2015-04-30T15:34:02Z

[FLINK-1951] Fix NullPointerException in delta iteration due to missing temp




> NullPointerException in DeltaIteration when no ForwardedFileds
> --
>
> Key: FLINK-1951
> URL: https://issues.apache.org/jira/browse/FLINK-1951
> Project: Flink
>  Issue Type: Bug
>  Components: Iterations
>Affects Versions: 0.9
>Reporter: Vasia Kalavri
>Assignee: Fabian Hueske
>Priority: Critical
>
> The following exception is thrown by the Connected Components example, if the 
> @ForwardedFieldsFirst("*") annotation from the ComponentIdFilter join is 
> removed:
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.examples.java.graph.ConnectedComponents$ComponentIdFilter.join(ConnectedComponents.java:186)
>   at 
> org.apache.flink.examples.java.graph.ConnectedComponents$ComponentIdFilter.join(ConnectedComponents.java:1)
>   at 
> org.apache.flink.runtime.operators.JoinWithSolutionSetSecondDriver.run(JoinWithSolutionSetSecondDriver.java:198)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
>   at 
> org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:139)
>   at 
> org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask.run(IterationIntermediatePactTask.java:92)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>   at 
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
>   at java.lang.Thread.run(Thread.java:745)
> [Code | https://github.com/vasia/flink/tree/cc-test] and [dataset | 
> http://snap.stanford.edu/data/com-DBLP.html] to reproduce.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1951] Fix NullPointerException in delta...

2015-04-30 Thread fhueske
GitHub user fhueske opened a pull request:

https://github.com/apache/flink/pull/641

[FLINK-1951] Fix NullPointerException in delta iteration due to missing 
input temp

See JIRA for details.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhueske/flink delta-npe-fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/641.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #641


commit 0c7ad4b03e3a47d5c7ee95659a31e62d89edfac6
Author: Fabian Hueske 
Date:   2015-04-30T15:34:02Z

[FLINK-1951] Fix NullPointerException in delta iteration due to missing temp




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-1965) Implement the Orthant-wise Limited Memory QuasiNewton optimization algorithm, a variant of L-BFGS that handles L1 regularization

2015-04-30 Thread Theodore Vasiloudis (JIRA)
Theodore Vasiloudis created FLINK-1965:
--

 Summary: Implement the Orthant-wise Limited Memory QuasiNewton 
optimization algorithm, a variant of L-BFGS that handles L1 regularization
 Key: FLINK-1965
 URL: https://issues.apache.org/jira/browse/FLINK-1965
 Project: Flink
  Issue Type: Wish
  Components: Machine Learning Library
Reporter: Theodore Vasiloudis
Priority: Minor


The Orthant-wise Limited Memory QuasiNewton (OWL-QN) is a quasi-Newton 
optimization method similar to L-BFGS that can handle L1 regularization. 

Implementing this would allow us to obtain sparse solutions while at the same 
time having the convergence benefits of a quasi-Newton method, when compared to 
stochastic gradient descent.

[Link to 
paper|http://research.microsoft.com/en-us/downloads/b1eb1016-1738-4bd5-83a9-370c9d498a03/]




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1951) NullPointerException in DeltaIteration when no ForwardedFileds

2015-04-30 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521656#comment-14521656
 ] 

Fabian Hueske commented on FLINK-1951:
--

It turned out that the channel to the solution set update task was meant to be 
blocking, i.e., it should first collect all updates and apply them after the 
join with the solution set was done. That way the solution set is not 
concurrently probed and updated.

The optimizer did the right decisions, however the decisions were not correctly 
translated into the JobGraph. 

> NullPointerException in DeltaIteration when no ForwardedFileds
> --
>
> Key: FLINK-1951
> URL: https://issues.apache.org/jira/browse/FLINK-1951
> Project: Flink
>  Issue Type: Bug
>  Components: Iterations
>Affects Versions: 0.9
>Reporter: Vasia Kalavri
>Assignee: Fabian Hueske
>Priority: Critical
>
> The following exception is thrown by the Connected Components example, if the 
> @ForwardedFieldsFirst("*") annotation from the ComponentIdFilter join is 
> removed:
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.examples.java.graph.ConnectedComponents$ComponentIdFilter.join(ConnectedComponents.java:186)
>   at 
> org.apache.flink.examples.java.graph.ConnectedComponents$ComponentIdFilter.join(ConnectedComponents.java:1)
>   at 
> org.apache.flink.runtime.operators.JoinWithSolutionSetSecondDriver.run(JoinWithSolutionSetSecondDriver.java:198)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
>   at 
> org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:139)
>   at 
> org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask.run(IterationIntermediatePactTask.java:92)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>   at 
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
>   at java.lang.Thread.run(Thread.java:745)
> [Code | https://github.com/vasia/flink/tree/cc-test] and [dataset | 
> http://snap.stanford.edu/data/com-DBLP.html] to reproduce.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [scheduling] implement backtracking of interme...

2015-04-30 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/640#issuecomment-97825782
  
Thank you for your valuable comments @tillrohrmann. I haven't worked with 
Akka's future compositions but it seems a sophisticated way to parallelize and 
combine the actor replies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-1964) Rework TwitterSource to use a Properties object instead of a file path

2015-04-30 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/FLINK-1964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Márton Balassi updated FLINK-1964:
--
Labels: starter  (was: )

> Rework TwitterSource to use a Properties object instead of a file path
> --
>
> Key: FLINK-1964
> URL: https://issues.apache.org/jira/browse/FLINK-1964
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Priority: Minor
>  Labels: starter
>
> The twitter connector is very hard to use on a cluster because it expects the 
> property file to be present on all nodes.
> It would be much easier to ask the user to pass a Properties object 
> immediately.
> Also, the javadoc of the class stops in the middle of the sentence.
> It was not obvious to me how the two examples TwitterStreaming and 
> TwitterTopology differ. Also, there is a third TwitterStream example in the 
> streaming examples.
> The documentation of the Twitter source refers to the non existent 
> TwitterLocal class.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [scheduling] implement backtracking of interme...

2015-04-30 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/640#discussion_r29435008
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Backtracking.java
 ---
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import akka.actor.ActorRef;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import com.google.common.base.Preconditions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Backtracking is a mechanism to schedule only those Execution Vertices 
of an Execution Graph which
+ * do not have an intermediate result available. This is in contrast to 
the simple way of scheduling
+ * a job, where all Execution Vertices are executed starting from the 
source. The Backtracking starts
+ * from the sinks and traverses the Execution Graph to the sources. It 
only reaches the sources if
+ * no intermediate result could be found on the way.
+ *
+ * @see ExecutionGraph
+ * @see ExecutionVertex
+ * @see Execution
+ */
+public class Backtracking {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(Backtracking.class);
+
+   private final Deque taskRequirements = new 
ArrayDeque();
+
+   private final Map 
visitedPartitions = new HashMap();
+
+   private ScheduleAction scheduleAction;
+   private Runnable postBacktrackingHook;
+   
+   public Backtracking(Collection vertices) {
+   Preconditions.checkNotNull(vertices);
+
+   // add all sinks found to the stack
+   for (ExecutionJobVertex ejv : vertices) {
+   if (ejv.getJobVertex().isOutputVertex()) {
+   for (ExecutionVertex ev : 
ejv.getTaskVertices()) {
+   if (ev.getExecutionState() == 
ExecutionState.CREATED) {
+   taskRequirements.add(new 
TaskRequirement(ev));
+   }
+   }
+   }
+   }
+
+   this.scheduleAction = new ScheduleAction() {
+   @Override
+   public void schedule(ExecutionVertex ev) {}
+   };
+
+   this.postBacktrackingHook = new Runnable() {
+   @Override
+   public void run() {}
+   };
+   }
+
+   /**
+* Scheduling to be performed when an ExecutionVertex is encountered 
that cannot be resumed
+*/
+   public interface ScheduleAction {
+   void schedule(ExecutionVertex ev);
+   }
+
+   /**
+* A TaskRequirement encaplusates an ExecutionVertex and its 
IntermediateResultPartitions which
+* are required for execution.
+*/
+   private class TaskRequirement {
+
+   private final ExecutionVertex executionVertex;
+   private final Deque pendingInputs 
= new ArrayDeque();
+   private final int numInputs;
+
+   private int availableInputs = 0;
+
+   public TaskRequirement(ExecutionVertex executionVertex) {
+   this.executionVertex = executionVertex;
+   this.pendingInputs.addAll(executionVertex.getInputs());
+   this.numInputs = pendingInputs.size();
+   }
+
+   public ExecutionVertex getExecutionVertex() {
+   r

[GitHub] flink pull request: [scheduling] implement backtracking of interme...

2015-04-30 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/640#discussion_r29434283
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Backtracking.java
 ---
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import akka.actor.ActorRef;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import com.google.common.base.Preconditions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+
+/**
+ * Backtracking is a mechanism to schedule only those Execution Vertices 
of an Execution Graph which
+ * do not have an intermediate result available. This is in contrast to 
the simple way of scheduling
+ * a job, where all Execution Vertices are executed starting from the 
source. The Backtracking starts
+ * from the sinks and traverses the Execution Graph to the sources. It 
only reaches the sources if
+ * no intermediate result could be found on the way.
+ *
+ * @see ExecutionGraph
+ * @see ExecutionVertex
+ * @see Execution
+ */
+public class Backtracking {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(Backtracking.class);
+
+   private final Deque taskRequirements = new 
ArrayDeque();
+
+   private final Map 
visitedPartitions = new HashMap();
+
+   private ScheduleAction scheduleAction;
+   private Runnable postBacktrackingHook;
+   
+   public Backtracking(Collection vertices) {
+   Preconditions.checkNotNull(vertices);
+
+   // add all sinks found to the stack
+   for (ExecutionJobVertex ejv : vertices) {
+   if (ejv.getJobVertex().isOutputVertex()) {
+   for (ExecutionVertex ev : 
ejv.getTaskVertices()) {
+   if (ev.getExecutionState() == 
ExecutionState.CREATED) {
+   taskRequirements.add(new 
TaskRequirement(ev));
+   }
+   }
+   }
+   }
+
+   this.scheduleAction = new ScheduleAction() {
+   @Override
+   public void schedule(ExecutionVertex ev) {}
+   };
+
+   this.postBacktrackingHook = new Runnable() {
+   @Override
+   public void run() {}
+   };
+   }
+
+   /**
+* Scheduling to be performed when an ExecutionVertex is encountered 
that cannot be resumed
+*/
+   public interface ScheduleAction {
+   void schedule(ExecutionVertex ev);
+   }
+
+   /**
+* A TaskRequirement encapsulates an ExecutionVertex and its 
IntermediateResultPartitions which
+* are required for execution.
+*/
+   private class TaskRequirement {
+
+   private final ExecutionVertex executionVertex;
+   private final Deque pendingInputs 
= new ArrayDeque();
+   private final int numInputs;
+
+   private int availableInputs = 0;
+
+   public TaskRequirement(ExecutionVertex executionVertex) {
+   this.executionVertex = executionVertex;
+   this.pendingInputs.addAll(executionVertex.getInputs());
+   this.numInputs = pendingInputs.size();
+   }
+
+   public ExecutionVertex getExecutionVert

[GitHub] flink pull request: [scheduling] implement backtracking of interme...

2015-04-30 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/640#issuecomment-97818576
  
The backtracking looks good @mxm.

I have some remarks concerning the way the locking of the partition on the 
TMs works. At the moment this happens sequentially, meaning that for each 
`IntermediateResultPartition` a message is sent to the TM and then the response 
is awaited. Only after receiving the response of this TM, the next 
`IntermediateResultPartition` is processed. This can considerably slow down the 
scheduling if the degree of parallelism is high. I think we should make use of 
Akka's future composition to do that concurrently.

Furthermore, we could think about doing the backtracking in parallel. This 
could also speed up the scheduling process.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [scheduling] implement backtracking of interme...

2015-04-30 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/640#discussion_r29433574
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Backtracking.java
 ---
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import akka.actor.ActorRef;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import com.google.common.base.Preconditions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Backtracking is a mechanism to schedule only those Execution Vertices 
of an Execution Graph which
+ * do not have an intermediate result available. This is in contrast to 
the simple way of scheduling
+ * a job, where all Execution Vertices are executed starting from the 
source. The Backtracking starts
+ * from the sinks and traverses the Execution Graph to the sources. It 
only reaches the sources if
+ * no intermediate result could be found on the way.
+ *
+ * @see ExecutionGraph
+ * @see ExecutionVertex
+ * @see Execution
+ */
+public class Backtracking {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(Backtracking.class);
+
+   private final Deque taskRequirements = new 
ArrayDeque();
+
+   private final Map 
visitedPartitions = new HashMap();
+
+   private ScheduleAction scheduleAction;
+   private Runnable postBacktrackingHook;
+   
+   public Backtracking(Collection vertices) {
+   Preconditions.checkNotNull(vertices);
+
+   // add all sinks found to the stack
+   for (ExecutionJobVertex ejv : vertices) {
+   if (ejv.getJobVertex().isOutputVertex()) {
+   for (ExecutionVertex ev : 
ejv.getTaskVertices()) {
+   if (ev.getExecutionState() == 
ExecutionState.CREATED) {
+   taskRequirements.add(new 
TaskRequirement(ev));
+   }
+   }
+   }
+   }
+
+   this.scheduleAction = new ScheduleAction() {
+   @Override
+   public void schedule(ExecutionVertex ev) {}
+   };
+
+   this.postBacktrackingHook = new Runnable() {
+   @Override
+   public void run() {}
+   };
+   }
+
+   /**
+* Scheduling to be performed when an ExecutionVertex is encountered 
that cannot be resumed
+*/
+   public interface ScheduleAction {
+   void schedule(ExecutionVertex ev);
+   }
+
+   /**
+* A TaskRequirement encaplusates an ExecutionVertex and its 
IntermediateResultPartitions which
+* are required for execution.
+*/
+   private class TaskRequirement {
+
+   private final ExecutionVertex executionVertex;
+   private final Deque pendingInputs 
= new ArrayDeque();
+   private final int numInputs;
+
+   private int availableInputs = 0;
+
+   public TaskRequirement(ExecutionVertex executionVertex) {
+   this.executionVertex = executionVertex;
+   this.pendingInputs.addAll(executionVertex.getInputs());
+   this.numInputs = pendingInputs.size();
+   }
+
+   public ExecutionVertex getExecutionVertex() {
+   

[jira] [Created] (FLINK-1964) Rework TwitterSource to use a Properties object instead of a file path

2015-04-30 Thread Robert Metzger (JIRA)
Robert Metzger created FLINK-1964:
-

 Summary: Rework TwitterSource to use a Properties object instead 
of a file path
 Key: FLINK-1964
 URL: https://issues.apache.org/jira/browse/FLINK-1964
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.9
Reporter: Robert Metzger
Priority: Minor


The twitter connector is very hard to use on a cluster because it expects the 
property file to be present on all nodes.

It would be much easier to ask the user to pass a Properties object immediately.
Also, the javadoc of the class stops in the middle of the sentence.

It was not obvious to me how the two examples TwitterStreaming and 
TwitterTopology differ. Also, there is a third TwitterStream example in the 
streaming examples.
The documentation of the Twitter source refers to the non existent TwitterLocal 
class.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [scheduling] implement backtracking of interme...

2015-04-30 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/640#discussion_r29432849
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Backtracking.java
 ---
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import akka.actor.ActorRef;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import com.google.common.base.Preconditions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Backtracking is a mechanism to schedule only those Execution Vertices 
of an Execution Graph which
+ * do not have an intermediate result available. This is in contrast to 
the simple way of scheduling
+ * a job, where all Execution Vertices are executed starting from the 
source. The Backtracking starts
+ * from the sinks and traverses the Execution Graph to the sources. It 
only reaches the sources if
+ * no intermediate result could be found on the way.
+ *
+ * @see ExecutionGraph
+ * @see ExecutionVertex
+ * @see Execution
+ */
+public class Backtracking {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(Backtracking.class);
+
+   private final Deque taskRequirements = new 
ArrayDeque();
+
+   private final Map 
visitedPartitions = new HashMap();
+
+   private ScheduleAction scheduleAction;
+   private Runnable postBacktrackingHook;
+   
+   public Backtracking(Collection vertices) {
+   Preconditions.checkNotNull(vertices);
+
+   // add all sinks found to the stack
+   for (ExecutionJobVertex ejv : vertices) {
+   if (ejv.getJobVertex().isOutputVertex()) {
+   for (ExecutionVertex ev : 
ejv.getTaskVertices()) {
+   if (ev.getExecutionState() == 
ExecutionState.CREATED) {
+   taskRequirements.add(new 
TaskRequirement(ev));
+   }
+   }
+   }
+   }
+
+   this.scheduleAction = new ScheduleAction() {
+   @Override
+   public void schedule(ExecutionVertex ev) {}
+   };
+
+   this.postBacktrackingHook = new Runnable() {
+   @Override
+   public void run() {}
+   };
+   }
+
+   /**
+* Scheduling to be performed when an ExecutionVertex is encountered 
that cannot be resumed
+*/
+   public interface ScheduleAction {
+   void schedule(ExecutionVertex ev);
+   }
+
+   /**
+* A TaskRequirement encaplusates an ExecutionVertex and its 
IntermediateResultPartitions which
+* are required for execution.
+*/
+   private class TaskRequirement {
+
+   private final ExecutionVertex executionVertex;
+   private final Deque pendingInputs 
= new ArrayDeque();
+   private final int numInputs;
+
+   private int availableInputs = 0;
+
+   public TaskRequirement(ExecutionVertex executionVertex) {
+   this.executionVertex = executionVertex;
+   this.pendingInputs.addAll(executionVertex.getInputs());
+   this.numInputs = pendingInputs.size();
+   }
+
+   public ExecutionVertex getExecutionVertex() {
+   r

[jira] [Commented] (FLINK-1103) Update Streaming examples to become self-contained

2015-04-30 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521546#comment-14521546
 ] 

Robert Metzger commented on FLINK-1103:
---

Where is the data in {{TwitterStreamData}} coming from?
Can we just take it an license it under ASF?

> Update Streaming examples to become self-contained
> --
>
> Key: FLINK-1103
> URL: https://issues.apache.org/jira/browse/FLINK-1103
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.7.0-incubating
>Reporter: Márton Balassi
>Assignee: Márton Balassi
>
> Streaming examples do not follow the standard set by the recent examples 
> refactor of the batch API.
> TestDataUtil should be removed and Object[][] used to contain the example 
> data.
> Comments are also lacking in comparison with the batch counterpart.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [scheduling] implement backtracking of interme...

2015-04-30 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/640#discussion_r29432635
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Backtracking.java
 ---
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import akka.actor.ActorRef;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import com.google.common.base.Preconditions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+
+/**
+ * Backtracking is a mechanism to schedule only those Execution Vertices 
of an Execution Graph which
+ * do not have an intermediate result available. This is in contrast to 
the simple way of scheduling
+ * a job, where all Execution Vertices are executed starting from the 
source. The Backtracking starts
+ * from the sinks and traverses the Execution Graph to the sources. It 
only reaches the sources if
+ * no intermediate result could be found on the way.
+ *
+ * @see ExecutionGraph
+ * @see ExecutionVertex
+ * @see Execution
+ */
+public class Backtracking {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(Backtracking.class);
+
+   private final Deque taskRequirements = new 
ArrayDeque();
+
+   private final Map 
visitedPartitions = new HashMap();
+
+   private ScheduleAction scheduleAction;
+   private Runnable postBacktrackingHook;
+   
+   public Backtracking(Collection vertices) {
+   Preconditions.checkNotNull(vertices);
+
+   // add all sinks found to the stack
+   for (ExecutionJobVertex ejv : vertices) {
+   if (ejv.getJobVertex().isOutputVertex()) {
+   for (ExecutionVertex ev : 
ejv.getTaskVertices()) {
+   if (ev.getExecutionState() == 
ExecutionState.CREATED) {
+   taskRequirements.add(new 
TaskRequirement(ev));
+   }
+   }
+   }
+   }
+
+   this.scheduleAction = new ScheduleAction() {
+   @Override
+   public void schedule(ExecutionVertex ev) {}
+   };
+
+   this.postBacktrackingHook = new Runnable() {
+   @Override
+   public void run() {}
+   };
+   }
+
+   /**
+* Scheduling to be performed when an ExecutionVertex is encountered 
that cannot be resumed
+*/
+   public interface ScheduleAction {
+   void schedule(ExecutionVertex ev);
+   }
+
+   /**
+* A TaskRequirement encapsulates an ExecutionVertex and its 
IntermediateResultPartitions which
+* are required for execution.
+*/
+   private class TaskRequirement {
+
+   private final ExecutionVertex executionVertex;
+   private final Deque pendingInputs 
= new ArrayDeque();
+   private final int numInputs;
+
+   private int availableInputs = 0;
+
+   public TaskRequirement(ExecutionVertex executionVertex) {
+   this.executionVertex = executionVertex;
+   this.pendingInputs.addAll(executionVertex.getInputs());
+   this.numInputs = pendingInputs.size();
+   }
+
+   public ExecutionVertex getExec

[GitHub] flink pull request: [scheduling] implement backtracking of interme...

2015-04-30 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/640#discussion_r29432295
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Backtracking.java
 ---
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import akka.actor.ActorRef;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import com.google.common.base.Preconditions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Backtracking is a mechanism to schedule only those Execution Vertices 
of an Execution Graph which
+ * do not have an intermediate result available. This is in contrast to 
the simple way of scheduling
+ * a job, where all Execution Vertices are executed starting from the 
source. The Backtracking starts
+ * from the sinks and traverses the Execution Graph to the sources. It 
only reaches the sources if
+ * no intermediate result could be found on the way.
+ *
+ * @see ExecutionGraph
+ * @see ExecutionVertex
+ * @see Execution
+ */
+public class Backtracking {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(Backtracking.class);
+
+   private final Deque taskRequirements = new 
ArrayDeque();
+
+   private final Map 
visitedPartitions = new HashMap();
+
+   private ScheduleAction scheduleAction;
+   private Runnable postBacktrackingHook;
+   
+   public Backtracking(Collection vertices) {
+   Preconditions.checkNotNull(vertices);
+
+   // add all sinks found to the stack
+   for (ExecutionJobVertex ejv : vertices) {
+   if (ejv.getJobVertex().isOutputVertex()) {
+   for (ExecutionVertex ev : 
ejv.getTaskVertices()) {
+   if (ev.getExecutionState() == 
ExecutionState.CREATED) {
+   taskRequirements.add(new 
TaskRequirement(ev));
+   }
+   }
+   }
+   }
+
+   this.scheduleAction = new ScheduleAction() {
+   @Override
+   public void schedule(ExecutionVertex ev) {}
+   };
+
+   this.postBacktrackingHook = new Runnable() {
+   @Override
+   public void run() {}
+   };
+   }
+
+   /**
+* Scheduling to be performed when an ExecutionVertex is encountered 
that cannot be resumed
+*/
+   public interface ScheduleAction {
+   void schedule(ExecutionVertex ev);
+   }
+
+   /**
+* A TaskRequirement encaplusates an ExecutionVertex and its 
IntermediateResultPartitions which
+* are required for execution.
+*/
+   private class TaskRequirement {
+
+   private final ExecutionVertex executionVertex;
+   private final Deque pendingInputs 
= new ArrayDeque();
+   private final int numInputs;
+
+   private int availableInputs = 0;
+
+   public TaskRequirement(ExecutionVertex executionVertex) {
+   this.executionVertex = executionVertex;
+   this.pendingInputs.addAll(executionVertex.getInputs());
+   this.numInputs = pendingInputs.size();
+   }
+
+   public ExecutionVertex getExecutionVertex() {
+   

[GitHub] flink pull request: [scheduling] implement backtracking of interme...

2015-04-30 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/640#discussion_r29431402
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Backtracking.java
 ---
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import akka.actor.ActorRef;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import com.google.common.base.Preconditions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Backtracking is a mechanism to schedule only those Execution Vertices 
of an Execution Graph which
+ * do not have an intermediate result available. This is in contrast to 
the simple way of scheduling
+ * a job, where all Execution Vertices are executed starting from the 
source. The Backtracking starts
+ * from the sinks and traverses the Execution Graph to the sources. It 
only reaches the sources if
+ * no intermediate result could be found on the way.
+ *
+ * @see ExecutionGraph
+ * @see ExecutionVertex
+ * @see Execution
+ */
+public class Backtracking {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(Backtracking.class);
+
+   private final Deque taskRequirements = new 
ArrayDeque();
+
+   private final Map 
visitedPartitions = new HashMap();
+
+   private ScheduleAction scheduleAction;
+   private Runnable postBacktrackingHook;
+   
+   public Backtracking(Collection vertices) {
+   Preconditions.checkNotNull(vertices);
+
+   // add all sinks found to the stack
+   for (ExecutionJobVertex ejv : vertices) {
+   if (ejv.getJobVertex().isOutputVertex()) {
+   for (ExecutionVertex ev : 
ejv.getTaskVertices()) {
+   if (ev.getExecutionState() == 
ExecutionState.CREATED) {
+   taskRequirements.add(new 
TaskRequirement(ev));
+   }
+   }
+   }
+   }
+
+   this.scheduleAction = new ScheduleAction() {
+   @Override
+   public void schedule(ExecutionVertex ev) {}
+   };
+
+   this.postBacktrackingHook = new Runnable() {
+   @Override
+   public void run() {}
+   };
+   }
+
+   /**
+* Scheduling to be performed when an ExecutionVertex is encountered 
that cannot be resumed
+*/
+   public interface ScheduleAction {
+   void schedule(ExecutionVertex ev);
+   }
+
+   /**
+* A TaskRequirement encaplusates an ExecutionVertex and its 
IntermediateResultPartitions which
+* are required for execution.
+*/
+   private class TaskRequirement {
+
+   private final ExecutionVertex executionVertex;
+   private final Deque pendingInputs 
= new ArrayDeque();
+   private final int numInputs;
+
+   private int availableInputs = 0;
+
+   public TaskRequirement(ExecutionVertex executionVertex) {
+   this.executionVertex = executionVertex;
+   this.pendingInputs.addAll(executionVertex.getInputs());
+   this.numInputs = pendingInputs.size();
+   }
+
+   public ExecutionVertex getExecutionVertex() {
+   r

[GitHub] flink pull request: [scheduling] implement backtracking of interme...

2015-04-30 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/640#discussion_r29431413
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -475,11 +479,21 @@ class JobManager(val flinkConfiguration: 
Configuration,
   throw new JobSubmissionException(jobId, "The given job is empty")
 }
 
-// see if there already exists an ExecutionGraph for the 
corresponding job ID
-executionGraph = currentJobs.getOrElseUpdate(jobGraph.getJobID,
-  (new ExecutionGraph(jobGraph.getJobID, jobGraph.getName,
-jobGraph.getJobConfiguration, timeout, 
jobGraph.getUserJarBlobKeys, userCodeLoader),
-JobInfo(sender(), System.currentTimeMillis(._1
+executionGraph = currentJob match {
+  case Some((graph, _)) if !graph.getState.isTerminalState =>
+  throw new Exception("Job still running")
--- End diff --

I hadn't thought about attaching vertices to a running execution graph. 
That could makes sense. Especially, in the case of streaming.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [scheduling] implement backtracking of interme...

2015-04-30 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/640#discussion_r29431391
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Backtracking.java
 ---
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import akka.actor.ActorRef;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import com.google.common.base.Preconditions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Backtracking is a mechanism to schedule only those Execution Vertices 
of an Execution Graph which
+ * do not have an intermediate result available. This is in contrast to 
the simple way of scheduling
+ * a job, where all Execution Vertices are executed starting from the 
source. The Backtracking starts
+ * from the sinks and traverses the Execution Graph to the sources. It 
only reaches the sources if
+ * no intermediate result could be found on the way.
+ *
+ * @see ExecutionGraph
+ * @see ExecutionVertex
+ * @see Execution
+ */
+public class Backtracking {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(Backtracking.class);
+
+   private final Deque taskRequirements = new 
ArrayDeque();
+
+   private final Map 
visitedPartitions = new HashMap();
+
+   private ScheduleAction scheduleAction;
+   private Runnable postBacktrackingHook;
+   
+   public Backtracking(Collection vertices) {
+   Preconditions.checkNotNull(vertices);
+
+   // add all sinks found to the stack
+   for (ExecutionJobVertex ejv : vertices) {
+   if (ejv.getJobVertex().isOutputVertex()) {
+   for (ExecutionVertex ev : 
ejv.getTaskVertices()) {
+   if (ev.getExecutionState() == 
ExecutionState.CREATED) {
+   taskRequirements.add(new 
TaskRequirement(ev));
+   }
+   }
+   }
+   }
+
+   this.scheduleAction = new ScheduleAction() {
+   @Override
+   public void schedule(ExecutionVertex ev) {}
+   };
+
+   this.postBacktrackingHook = new Runnable() {
+   @Override
+   public void run() {}
+   };
+   }
+
+   /**
+* Scheduling to be performed when an ExecutionVertex is encountered 
that cannot be resumed
+*/
+   public interface ScheduleAction {
+   void schedule(ExecutionVertex ev);
+   }
+
+   /**
+* A TaskRequirement encaplusates an ExecutionVertex and its 
IntermediateResultPartitions which
+* are required for execution.
+*/
+   private class TaskRequirement {
+
+   private final ExecutionVertex executionVertex;
+   private final Deque pendingInputs 
= new ArrayDeque();
+   private final int numInputs;
+
+   private int availableInputs = 0;
+
+   public TaskRequirement(ExecutionVertex executionVertex) {
+   this.executionVertex = executionVertex;
+   this.pendingInputs.addAll(executionVertex.getInputs());
+   this.numInputs = pendingInputs.size();
+   }
+
+   public ExecutionVertex getExecutionVertex() {
+   r

[jira] [Commented] (FLINK-1951) NullPointerException in DeltaIteration when no ForwardedFileds

2015-04-30 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521507#comment-14521507
 ] 

Fabian Hueske commented on FLINK-1951:
--

It was a bit tricky and took some time but I think I finally found the bug :-)

The problem is the following. The failing join is the join with the solution 
set. The result of this join is the solution set data that is reinserted into 
the hashtable that was previously probed. 

If the annotation is set, the optimizer knows that the result of the join is 
still partitioned and can do the insert locally by chaining the calls. This 
means that probing and reinserting of the same HT happens in the same thread, 
i.e., there is no concurrent access.

If the annotation is NOT set, the optimizer does not know that the result is 
still partitioned. Hence, it inserts a network shuffling strategy to partition 
the data. The receiving task then reinserts the solution set delta records into 
the hash table. Since probing and inserting are happening in two separate tasks 
which are separated by a network connection, they happen concurrently in two 
different threads and might interfere.

Guarding reinsertion and probing with a simple lock solves the problem. Not 
sure if that's the best fix for the problem though...



> NullPointerException in DeltaIteration when no ForwardedFileds
> --
>
> Key: FLINK-1951
> URL: https://issues.apache.org/jira/browse/FLINK-1951
> Project: Flink
>  Issue Type: Bug
>  Components: Iterations
>Affects Versions: 0.9
>Reporter: Vasia Kalavri
>Assignee: Fabian Hueske
>Priority: Critical
>
> The following exception is thrown by the Connected Components example, if the 
> @ForwardedFieldsFirst("*") annotation from the ComponentIdFilter join is 
> removed:
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.examples.java.graph.ConnectedComponents$ComponentIdFilter.join(ConnectedComponents.java:186)
>   at 
> org.apache.flink.examples.java.graph.ConnectedComponents$ComponentIdFilter.join(ConnectedComponents.java:1)
>   at 
> org.apache.flink.runtime.operators.JoinWithSolutionSetSecondDriver.run(JoinWithSolutionSetSecondDriver.java:198)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
>   at 
> org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:139)
>   at 
> org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask.run(IterationIntermediatePactTask.java:92)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>   at 
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
>   at java.lang.Thread.run(Thread.java:745)
> [Code | https://github.com/vasia/flink/tree/cc-test] and [dataset | 
> http://snap.stanford.edu/data/com-DBLP.html] to reproduce.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [scheduling] implement backtracking of interme...

2015-04-30 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/640#discussion_r29427789
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -475,11 +479,21 @@ class JobManager(val flinkConfiguration: 
Configuration,
   throw new JobSubmissionException(jobId, "The given job is empty")
 }
 
-// see if there already exists an ExecutionGraph for the 
corresponding job ID
-executionGraph = currentJobs.getOrElseUpdate(jobGraph.getJobID,
-  (new ExecutionGraph(jobGraph.getJobID, jobGraph.getName,
-jobGraph.getJobConfiguration, timeout, 
jobGraph.getUserJarBlobKeys, userCodeLoader),
-JobInfo(sender(), System.currentTimeMillis(._1
+executionGraph = currentJob match {
+  case Some((graph, _)) if !graph.getState.isTerminalState =>
+  throw new Exception("Job still running")
--- End diff --

Why can I not append new vertices to a running execution graph?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [scheduling] implement backtracking of interme...

2015-04-30 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/640#discussion_r29426706
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Backtracking.java
 ---
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import akka.actor.ActorRef;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import com.google.common.base.Preconditions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Backtracking is a mechanism to schedule only those Execution Vertices 
of an Execution Graph which
+ * do not have an intermediate result available. This is in contrast to 
the simple way of scheduling
+ * a job, where all Execution Vertices are executed starting from the 
source. The Backtracking starts
+ * from the sinks and traverses the Execution Graph to the sources. It 
only reaches the sources if
+ * no intermediate result could be found on the way.
+ *
+ * @see ExecutionGraph
+ * @see ExecutionVertex
+ * @see Execution
+ */
+public class Backtracking {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(Backtracking.class);
+
+   private final Deque taskRequirements = new 
ArrayDeque();
+
+   private final Map 
visitedPartitions = new HashMap();
+
+   private ScheduleAction scheduleAction;
+   private Runnable postBacktrackingHook;
+   
+   public Backtracking(Collection vertices) {
+   Preconditions.checkNotNull(vertices);
+
+   // add all sinks found to the stack
+   for (ExecutionJobVertex ejv : vertices) {
+   if (ejv.getJobVertex().isOutputVertex()) {
+   for (ExecutionVertex ev : 
ejv.getTaskVertices()) {
+   if (ev.getExecutionState() == 
ExecutionState.CREATED) {
+   taskRequirements.add(new 
TaskRequirement(ev));
+   }
+   }
+   }
+   }
+
+   this.scheduleAction = new ScheduleAction() {
+   @Override
+   public void schedule(ExecutionVertex ev) {}
+   };
+
+   this.postBacktrackingHook = new Runnable() {
+   @Override
+   public void run() {}
+   };
+   }
+
+   /**
+* Scheduling to be performed when an ExecutionVertex is encountered 
that cannot be resumed
+*/
+   public interface ScheduleAction {
+   void schedule(ExecutionVertex ev);
+   }
+
+   /**
+* A TaskRequirement encaplusates an ExecutionVertex and its 
IntermediateResultPartitions which
+* are required for execution.
+*/
+   private class TaskRequirement {
+
+   private final ExecutionVertex executionVertex;
+   private final Deque pendingInputs 
= new ArrayDeque();
+   private final int numInputs;
+
+   private int availableInputs = 0;
+
+   public TaskRequirement(ExecutionVertex executionVertex) {
+   this.executionVertex = executionVertex;
+   this.pendingInputs.addAll(executionVertex.getInputs());
+   this.numInputs = pendingInputs.size();
+   }
+
+   public ExecutionVertex getExecutionVertex() {
+   

[GitHub] flink pull request: [scheduling] implement backtracking of interme...

2015-04-30 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/640#discussion_r29426482
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Backtracking.java
 ---
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import akka.actor.ActorRef;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import com.google.common.base.Preconditions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Backtracking is a mechanism to schedule only those Execution Vertices 
of an Execution Graph which
+ * do not have an intermediate result available. This is in contrast to 
the simple way of scheduling
+ * a job, where all Execution Vertices are executed starting from the 
source. The Backtracking starts
+ * from the sinks and traverses the Execution Graph to the sources. It 
only reaches the sources if
+ * no intermediate result could be found on the way.
+ *
+ * @see ExecutionGraph
+ * @see ExecutionVertex
+ * @see Execution
+ */
+public class Backtracking {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(Backtracking.class);
+
+   private final Deque taskRequirements = new 
ArrayDeque();
+
+   private final Map 
visitedPartitions = new HashMap();
+
+   private ScheduleAction scheduleAction;
+   private Runnable postBacktrackingHook;
+   
+   public Backtracking(Collection vertices) {
+   Preconditions.checkNotNull(vertices);
+
+   // add all sinks found to the stack
+   for (ExecutionJobVertex ejv : vertices) {
+   if (ejv.getJobVertex().isOutputVertex()) {
+   for (ExecutionVertex ev : 
ejv.getTaskVertices()) {
+   if (ev.getExecutionState() == 
ExecutionState.CREATED) {
+   taskRequirements.add(new 
TaskRequirement(ev));
+   }
+   }
+   }
+   }
+
+   this.scheduleAction = new ScheduleAction() {
+   @Override
+   public void schedule(ExecutionVertex ev) {}
+   };
+
+   this.postBacktrackingHook = new Runnable() {
+   @Override
+   public void run() {}
+   };
+   }
+
+   /**
+* Scheduling to be performed when an ExecutionVertex is encountered 
that cannot be resumed
+*/
+   public interface ScheduleAction {
+   void schedule(ExecutionVertex ev);
+   }
+
+   /**
+* A TaskRequirement encaplusates an ExecutionVertex and its 
IntermediateResultPartitions which
+* are required for execution.
+*/
+   private class TaskRequirement {
+
+   private final ExecutionVertex executionVertex;
+   private final Deque pendingInputs 
= new ArrayDeque();
+   private final int numInputs;
+
+   private int availableInputs = 0;
+
+   public TaskRequirement(ExecutionVertex executionVertex) {
+   this.executionVertex = executionVertex;
+   this.pendingInputs.addAll(executionVertex.getInputs());
+   this.numInputs = pendingInputs.size();
+   }
+
+   public ExecutionVertex getExecutionVertex() {
+   

[jira] [Updated] (FLINK-1963) Improve distinct() transformation

2015-04-30 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-1963:
-
Description: 
The `distinct()` transformation is a bit limited right now with respect to 
processing atomic key types:

- `distinct(String ...)` works only for composite data types (POJO, tuple), but 
wildcard expression should also be supported for atomic key types
- `distinct()` only works for composite types, but should also work for atomic 
key types
- `distinct(KeySelector)` is the most generic one, but not very handy to use
- `distinct(int ...)` works only for Tuple data types (which is fine)

Fixing this should be rather easy.

  was:
The `distinct()` transformation is a bit limited right now with respect to 
processing atomic key types:

- `distinct(String ...)` works only for composite data types (POJO, tuple), but 
wildcard expression should also be supported for atomic key types
- `distinct()` only works for composite types, but should also work for atomic 
key types
- `distinct(KeySelector)` is the most generic one, but not very handy to use
- `distinct(int ...)` works only for Tuple data types (which is fine)


> Improve distinct() transformation
> -
>
> Key: FLINK-1963
> URL: https://issues.apache.org/jira/browse/FLINK-1963
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Priority: Minor
>  Labels: starter
> Fix For: 0.9
>
>
> The `distinct()` transformation is a bit limited right now with respect to 
> processing atomic key types:
> - `distinct(String ...)` works only for composite data types (POJO, tuple), 
> but wildcard expression should also be supported for atomic key types
> - `distinct()` only works for composite types, but should also work for 
> atomic key types
> - `distinct(KeySelector)` is the most generic one, but not very handy to use
> - `distinct(int ...)` works only for Tuple data types (which is fine)
> Fixing this should be rather easy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-1963) Improve distinct() transformation

2015-04-30 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-1963:


 Summary: Improve distinct() transformation
 Key: FLINK-1963
 URL: https://issues.apache.org/jira/browse/FLINK-1963
 Project: Flink
  Issue Type: Improvement
  Components: Java API, Scala API
Affects Versions: 0.9
Reporter: Fabian Hueske
Priority: Minor
 Fix For: 0.9


The `distinct()` transformation is a bit limited right now with respect to 
processing atomic key types:

- `distinct(String ...)` works only for composite data types (POJO, tuple), but 
wildcard expression should also be supported for atomic key types
- `distinct()` only works for composite types, but should also work for atomic 
key types
- `distinct(KeySelector)` is the most generic one, but not very handy to use
- `distinct(int ...)` works only for Tuple data types (which is fine)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-1963) Improve distinct() transformation

2015-04-30 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-1963:
-
Labels: starter  (was: start)

> Improve distinct() transformation
> -
>
> Key: FLINK-1963
> URL: https://issues.apache.org/jira/browse/FLINK-1963
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Priority: Minor
>  Labels: starter
> Fix For: 0.9
>
>
> The `distinct()` transformation is a bit limited right now with respect to 
> processing atomic key types:
> - `distinct(String ...)` works only for composite data types (POJO, tuple), 
> but wildcard expression should also be supported for atomic key types
> - `distinct()` only works for composite types, but should also work for 
> atomic key types
> - `distinct(KeySelector)` is the most generic one, but not very handy to use
> - `distinct(int ...)` works only for Tuple data types (which is fine)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1962) Add Gelly Scala API

2015-04-30 Thread Sebastian Schelter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521276#comment-14521276
 ] 

Sebastian Schelter commented on FLINK-1962:
---

Let me know once you have an implementation to play with, I have some scala 
graph analysis code lying around that I could port to Gelly.

> Add Gelly Scala API
> ---
>
> Key: FLINK-1962
> URL: https://issues.apache.org/jira/browse/FLINK-1962
> Project: Flink
>  Issue Type: Task
>  Components: Gelly, Scala API
>Affects Versions: 0.9
>Reporter: Vasia Kalavri
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-1962) Add Gelly Scala API

2015-04-30 Thread Vasia Kalavri (JIRA)
Vasia Kalavri created FLINK-1962:


 Summary: Add Gelly Scala API
 Key: FLINK-1962
 URL: https://issues.apache.org/jira/browse/FLINK-1962
 Project: Flink
  Issue Type: Task
  Components: Gelly, Scala API
Affects Versions: 0.9
Reporter: Vasia Kalavri






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1959) Accumulators BROKEN after Partitioning

2015-04-30 Thread mustafa elbehery (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521195#comment-14521195
 ] 

mustafa elbehery commented on FLINK-1959:
-

I just tried with DOP 1,2 and 5 .. All return NULL 

"Number of detected empty fields per column: null"

> Accumulators BROKEN after Partitioning
> --
>
> Key: FLINK-1959
> URL: https://issues.apache.org/jira/browse/FLINK-1959
> Project: Flink
>  Issue Type: Bug
>  Components: Examples
>Affects Versions: 0.8.1
>Reporter: mustafa elbehery
>Priority: Critical
> Fix For: 0.8.1
>
>
> while running the Accumulator example in 
> https://github.com/Elbehery/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java,
>  
> I tried to alter the data flow with "PartitionByHash" function before 
> applying "Filter", and the resulted accumulator was NULL. 
> By Debugging, I could see the accumulator in the RunTime Map. However, by 
> retrieving the accumulator from the JobExecutionResult object, it was NULL. 
> The line caused the problem is "file.partitionByHash(1).filter(new 
> EmptyFieldFilter())" instead of "file.filter(new EmptyFieldFilter())"



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-1843) Job History gets cleared too fast

2015-04-30 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels resolved FLINK-1843.
---
Resolution: Fixed

> Job History gets cleared too fast
> -
>
> Key: FLINK-1843
> URL: https://issues.apache.org/jira/browse/FLINK-1843
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 0.9
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>  Labels: starter
> Fix For: 0.9
>
>
> As per FLINK-1442, the JobManager stores the archived ExecutionGraph behind a 
> SoftReference. At least for local setups, this mechanism doesn't seem to work 
> properly. There are two issues:
> - The history gets cleared too fast
> - The history gets cleared in a non-sequential fashion, i.e. arbitrary old 
> ExecutionGraph are discarded
> To solve these problems we might
> - Store the least recent ExecutionGraph behind a SoftReference
> - Store the most recent ExecutionGraphs without a SoftReference
> That way, we can save memory but have the latest history available to the 
> user. We might introduce a configuration variable where the user can specify 
> the number of ExecutionGraphs that should be held in memory. The remaining 
> can be stored behind a SoftReference.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1749) Add Boosting algorithm for ensemble learning to machine learning library

2015-04-30 Thread narayana reddy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521127#comment-14521127
 ] 

narayana reddy commented on FLINK-1749:
---

Thank you Till Rohrmann. 

> Add Boosting algorithm for ensemble learning to machine learning library
> 
>
> Key: FLINK-1749
> URL: https://issues.apache.org/jira/browse/FLINK-1749
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Till Rohrmann
>Assignee: narayana reddy
>  Labels: ML
>
> Boosting [1] can help to create strong learners from an ensemble of weak 
> learners and thus improving its performance. Widely used boosting algorithms 
> are AdaBoost [2] and LogitBoost [3]. The work of I. Palit and C. K. Reddy [4] 
> investigates how boosting can be efficiently realised in a distributed 
> setting. 
> Resources:
> [1] [http://en.wikipedia.org/wiki/Boosting_%28machine_learning%29]
> [2] [http://en.wikipedia.org/wiki/AdaBoost]
> [3] [http://en.wikipedia.org/wiki/LogitBoost]
> [4] [http://ieeexplore.ieee.org/xpls/abs_all.jsp?arnumber=6035709]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-1749) Add Boosting algorithm for ensemble learning to machine learning library

2015-04-30 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-1749:
-
Assignee: narayana reddy

> Add Boosting algorithm for ensemble learning to machine learning library
> 
>
> Key: FLINK-1749
> URL: https://issues.apache.org/jira/browse/FLINK-1749
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Till Rohrmann
>Assignee: narayana reddy
>  Labels: ML
>
> Boosting [1] can help to create strong learners from an ensemble of weak 
> learners and thus improving its performance. Widely used boosting algorithms 
> are AdaBoost [2] and LogitBoost [3]. The work of I. Palit and C. K. Reddy [4] 
> investigates how boosting can be efficiently realised in a distributed 
> setting. 
> Resources:
> [1] [http://en.wikipedia.org/wiki/Boosting_%28machine_learning%29]
> [2] [http://en.wikipedia.org/wiki/AdaBoost]
> [3] [http://en.wikipedia.org/wiki/LogitBoost]
> [4] [http://ieeexplore.ieee.org/xpls/abs_all.jsp?arnumber=6035709]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1749) Add Boosting algorithm for ensemble learning to machine learning library

2015-04-30 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521119#comment-14521119
 ] 

Till Rohrmann commented on FLINK-1749:
--

Hi Narayana,

great to hear that you want to pick this issue up :-) I assigned the issue to 
you.

You can use this issue to keep track of your progress.

> Add Boosting algorithm for ensemble learning to machine learning library
> 
>
> Key: FLINK-1749
> URL: https://issues.apache.org/jira/browse/FLINK-1749
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Till Rohrmann
>  Labels: ML
>
> Boosting [1] can help to create strong learners from an ensemble of weak 
> learners and thus improving its performance. Widely used boosting algorithms 
> are AdaBoost [2] and LogitBoost [3]. The work of I. Palit and C. K. Reddy [4] 
> investigates how boosting can be efficiently realised in a distributed 
> setting. 
> Resources:
> [1] [http://en.wikipedia.org/wiki/Boosting_%28machine_learning%29]
> [2] [http://en.wikipedia.org/wiki/AdaBoost]
> [3] [http://en.wikipedia.org/wiki/LogitBoost]
> [4] [http://ieeexplore.ieee.org/xpls/abs_all.jsp?arnumber=6035709]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1749) Add Boosting algorithm for ensemble learning to machine learning library

2015-04-30 Thread narayana reddy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521105#comment-14521105
 ] 

narayana reddy commented on FLINK-1749:
---

Hi Till Rohrmann,
I am pleased to contribute to this flink-1749 issue.

Will you please assign me this task.

I am master's student from IIIT-Delhi looking for some open source contribution.

> Add Boosting algorithm for ensemble learning to machine learning library
> 
>
> Key: FLINK-1749
> URL: https://issues.apache.org/jira/browse/FLINK-1749
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Till Rohrmann
>  Labels: ML
>
> Boosting [1] can help to create strong learners from an ensemble of weak 
> learners and thus improving its performance. Widely used boosting algorithms 
> are AdaBoost [2] and LogitBoost [3]. The work of I. Palit and C. K. Reddy [4] 
> investigates how boosting can be efficiently realised in a distributed 
> setting. 
> Resources:
> [1] [http://en.wikipedia.org/wiki/Boosting_%28machine_learning%29]
> [2] [http://en.wikipedia.org/wiki/AdaBoost]
> [3] [http://en.wikipedia.org/wiki/LogitBoost]
> [4] [http://ieeexplore.ieee.org/xpls/abs_all.jsp?arnumber=6035709]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)