[jira] [Commented] (FLINK-1670) Collect method for streaming
[ https://issues.apache.org/jira/browse/FLINK-1670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14522195#comment-14522195 ] ASF GitHub Bot commented on FLINK-1670: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/581#issuecomment-97956542 I think this looks good now ! I think this needs a test (integration test), otherwise it probably gets broken by some change soon. Starting a `ForkableFlinkMiniCluster` and then submit the job via the `RemoteStreamEnvironment` and collect the result back should cover it. > Collect method for streaming > > > Key: FLINK-1670 > URL: https://issues.apache.org/jira/browse/FLINK-1670 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 0.9 >Reporter: Márton Balassi >Assignee: Gabor Gevay >Priority: Minor > > A convenience method for streaming back the results of a job to the client. > As the client itself is a bottleneck anyway an easy solution would be to > provide a socket sink with degree of parallelism 1, from which a client > utility can read. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1670] Made DataStream iterable
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/581#issuecomment-97956542 I think this looks good now ! I think this needs a test (integration test), otherwise it probably gets broken by some change soon. Starting a `ForkableFlinkMiniCluster` and then submit the job via the `RemoteStreamEnvironment` and collect the result back should cover it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [streaming] New Source and state checkpointing...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/643 [streaming] New Source and state checkpointing interfaces These interfaces allow streaming sources/operators to interact with the state checkpointing in a more precise manner. The interfaces are so far intended mainly for internal state handling (sources, window operators). An open question what state interface we want to expose to the user code such that, on scale-in and scale-out, the state repartitions consistently with the data partitions. - For groupByKey operations, we could scope the state by key. - For all other operations, state may be repartitioned in a user-defined manner. This would give no guarantees about alignment with partitions. Think of a pipeline `partionBy -> map()`. The state accumulated in the `map()` function is partitioned by a key, but the state entries may not be able to be linked to a certain key or key-hash value. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink cp_interfaces Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/643.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #643 commit 76d63d5b03643d3c6e51a882eb9427e421806342 Author: Stephan Ewen Date: 2015-04-30T20:05:27Z [streaming] New Source and state checkpointing interfaces that allow operations to interact with the state checkpointing in a more precise manner. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1818] Added api to cancel job from clie...
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/642#discussion_r29462715 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/Client.java --- @@ -420,6 +428,60 @@ public JobSubmissionResult run(JobGraph jobGraph, boolean wait) throws ProgramIn } } + + /** +* Executes the CANCEL action. +* +* @param args Command line arguments for the cancel action. +*/ + + --- End diff -- Nit: Extra newlines --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1818) Provide API to cancel running job
[ https://issues.apache.org/jira/browse/FLINK-1818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14522117#comment-14522117 ] ASF GitHub Bot commented on FLINK-1818: --- Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/642#discussion_r29462715 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/Client.java --- @@ -420,6 +428,60 @@ public JobSubmissionResult run(JobGraph jobGraph, boolean wait) throws ProgramIn } } + + /** +* Executes the CANCEL action. +* +* @param args Command line arguments for the cancel action. +*/ + + --- End diff -- Nit: Extra newlines > Provide API to cancel running job > - > > Key: FLINK-1818 > URL: https://issues.apache.org/jira/browse/FLINK-1818 > Project: Flink > Issue Type: Improvement > Components: Java API >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: niraj rai > Labels: starter > > http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Canceling-a-Cluster-Job-from-Java-td4897.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1818) Provide API to cancel running job
[ https://issues.apache.org/jira/browse/FLINK-1818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521937#comment-14521937 ] niraj rai commented on FLINK-1818: -- Hi [~mxm], Thanks for reviewing it. I will incorporate the changes suggested by you and resubmit. Thanks again. > Provide API to cancel running job > - > > Key: FLINK-1818 > URL: https://issues.apache.org/jira/browse/FLINK-1818 > Project: Flink > Issue Type: Improvement > Components: Java API >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: niraj rai > Labels: starter > > http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Canceling-a-Cluster-Job-from-Java-td4897.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1818) Provide API to cancel running job
[ https://issues.apache.org/jira/browse/FLINK-1818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521849#comment-14521849 ] ASF GitHub Bot commented on FLINK-1818: --- Github user mxm commented on the pull request: https://github.com/apache/flink/pull/642#issuecomment-97882155 Thank you for your pull request @rainiraj. I have made some comments. If you addressed these we would be happy to merge your changes. Feel free to ask if anything is unclear. > Provide API to cancel running job > - > > Key: FLINK-1818 > URL: https://issues.apache.org/jira/browse/FLINK-1818 > Project: Flink > Issue Type: Improvement > Components: Java API >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: niraj rai > Labels: starter > > http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Canceling-a-Cluster-Job-from-Java-td4897.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1818] Added api to cancel job from clie...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/642#issuecomment-97882155 Thank you for your pull request @rainiraj. I have made some comments. If you addressed these we would be happy to merge your changes. Feel free to ask if anything is unclear. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1818) Provide API to cancel running job
[ https://issues.apache.org/jira/browse/FLINK-1818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521839#comment-14521839 ] ASF GitHub Bot commented on FLINK-1818: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/642#discussion_r29447974 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/Client.java --- @@ -420,6 +428,60 @@ public JobSubmissionResult run(JobGraph jobGraph, boolean wait) throws ProgramIn } } + + /** +* Executes the CANCEL action. +* +* @param args Command line arguments for the cancel action. +*/ + + + protected int cancel(JobID jobId){ + LOG.info("Executing 'cancel' command."); + + final FiniteDuration timeout = AkkaUtils.getTimeout(configuration); + + try { + String address = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); + Option> remoting = + new Some>(new Tuple2("", 0)); + + // start a remote actor system to listen on an arbitrary port + ActorSystem actorSystem = AkkaUtils.createActorSystem(configuration, remoting); + + + ActorRef jobManager = JobManager.getJobManagerRemoteReference(address, actorSystem, timeout); + Future response = Patterns.ask(jobManager, new JobManagerMessages.CancelJob(jobId), new Timeout(AkkaUtils.INF_TIMEOUT())); + + try { + Await.result(response, timeout); + return 0; + } + catch (Exception e) { + throw new Exception("Canceling the job with ID " + jobId + " failed.", e); + } + } + catch (Throwable t) { + return handleError(t); + } --- End diff -- I would handle the error explicitly here and print a stacktrace and info message. Creating a new method is not necessary because it is not used elsewhere. You might want to return a boolean which returns true if the cancellation succeeded (CancellationSuccess received) or failed (otherwise). > Provide API to cancel running job > - > > Key: FLINK-1818 > URL: https://issues.apache.org/jira/browse/FLINK-1818 > Project: Flink > Issue Type: Improvement > Components: Java API >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: niraj rai > Labels: starter > > http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Canceling-a-Cluster-Job-from-Java-td4897.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1818] Added api to cancel job from clie...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/642#discussion_r29447974 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/Client.java --- @@ -420,6 +428,60 @@ public JobSubmissionResult run(JobGraph jobGraph, boolean wait) throws ProgramIn } } + + /** +* Executes the CANCEL action. +* +* @param args Command line arguments for the cancel action. +*/ + + + protected int cancel(JobID jobId){ + LOG.info("Executing 'cancel' command."); + + final FiniteDuration timeout = AkkaUtils.getTimeout(configuration); + + try { + String address = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); + Option> remoting = + new Some>(new Tuple2("", 0)); + + // start a remote actor system to listen on an arbitrary port + ActorSystem actorSystem = AkkaUtils.createActorSystem(configuration, remoting); + + + ActorRef jobManager = JobManager.getJobManagerRemoteReference(address, actorSystem, timeout); + Future response = Patterns.ask(jobManager, new JobManagerMessages.CancelJob(jobId), new Timeout(AkkaUtils.INF_TIMEOUT())); + + try { + Await.result(response, timeout); + return 0; + } + catch (Exception e) { + throw new Exception("Canceling the job with ID " + jobId + " failed.", e); + } + } + catch (Throwable t) { + return handleError(t); + } --- End diff -- I would handle the error explicitly here and print a stacktrace and info message. Creating a new method is not necessary because it is not used elsewhere. You might want to return a boolean which returns true if the cancellation succeeded (CancellationSuccess received) or failed (otherwise). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1818] Added api to cancel job from clie...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/642#discussion_r29447805 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/Client.java --- @@ -420,6 +428,60 @@ public JobSubmissionResult run(JobGraph jobGraph, boolean wait) throws ProgramIn } } + + /** +* Executes the CANCEL action. +* +* @param args Command line arguments for the cancel action. +*/ + + + protected int cancel(JobID jobId){ + LOG.info("Executing 'cancel' command."); + + final FiniteDuration timeout = AkkaUtils.getTimeout(configuration); + + try { + String address = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); + Option> remoting = + new Some>(new Tuple2("", 0)); + + // start a remote actor system to listen on an arbitrary port + ActorSystem actorSystem = AkkaUtils.createActorSystem(configuration, remoting); + + + ActorRef jobManager = JobManager.getJobManagerRemoteReference(address, actorSystem, timeout); + Future response = Patterns.ask(jobManager, new JobManagerMessages.CancelJob(jobId), new Timeout(AkkaUtils.INF_TIMEOUT())); + + try { + Await.result(response, timeout); --- End diff -- You should checked the result here. `Await.result` returns an Object which is either `CancellationSuccess` or `CancellationFailure`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1818] Added api to cancel job from clie...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/642#discussion_r29447791 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/Client.java --- @@ -420,6 +428,60 @@ public JobSubmissionResult run(JobGraph jobGraph, boolean wait) throws ProgramIn } } + + /** +* Executes the CANCEL action. +* +* @param args Command line arguments for the cancel action. +*/ + + + protected int cancel(JobID jobId){ + LOG.info("Executing 'cancel' command."); + + final FiniteDuration timeout = AkkaUtils.getTimeout(configuration); + + try { + String address = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); + Option> remoting = + new Some>(new Tuple2("", 0)); + + // start a remote actor system to listen on an arbitrary port + ActorSystem actorSystem = AkkaUtils.createActorSystem(configuration, remoting); + + + ActorRef jobManager = JobManager.getJobManagerRemoteReference(address, actorSystem, timeout); + Future response = Patterns.ask(jobManager, new JobManagerMessages.CancelJob(jobId), new Timeout(AkkaUtils.INF_TIMEOUT())); --- End diff -- You should also specify a finite timeout here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1818) Provide API to cancel running job
[ https://issues.apache.org/jira/browse/FLINK-1818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521829#comment-14521829 ] ASF GitHub Bot commented on FLINK-1818: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/642#discussion_r29447791 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/Client.java --- @@ -420,6 +428,60 @@ public JobSubmissionResult run(JobGraph jobGraph, boolean wait) throws ProgramIn } } + + /** +* Executes the CANCEL action. +* +* @param args Command line arguments for the cancel action. +*/ + + + protected int cancel(JobID jobId){ + LOG.info("Executing 'cancel' command."); + + final FiniteDuration timeout = AkkaUtils.getTimeout(configuration); + + try { + String address = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); + Option> remoting = + new Some>(new Tuple2("", 0)); + + // start a remote actor system to listen on an arbitrary port + ActorSystem actorSystem = AkkaUtils.createActorSystem(configuration, remoting); + + + ActorRef jobManager = JobManager.getJobManagerRemoteReference(address, actorSystem, timeout); + Future response = Patterns.ask(jobManager, new JobManagerMessages.CancelJob(jobId), new Timeout(AkkaUtils.INF_TIMEOUT())); --- End diff -- You should also specify a finite timeout here. > Provide API to cancel running job > - > > Key: FLINK-1818 > URL: https://issues.apache.org/jira/browse/FLINK-1818 > Project: Flink > Issue Type: Improvement > Components: Java API >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: niraj rai > Labels: starter > > http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Canceling-a-Cluster-Job-from-Java-td4897.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1818) Provide API to cancel running job
[ https://issues.apache.org/jira/browse/FLINK-1818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521830#comment-14521830 ] ASF GitHub Bot commented on FLINK-1818: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/642#discussion_r29447805 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/Client.java --- @@ -420,6 +428,60 @@ public JobSubmissionResult run(JobGraph jobGraph, boolean wait) throws ProgramIn } } + + /** +* Executes the CANCEL action. +* +* @param args Command line arguments for the cancel action. +*/ + + + protected int cancel(JobID jobId){ + LOG.info("Executing 'cancel' command."); + + final FiniteDuration timeout = AkkaUtils.getTimeout(configuration); + + try { + String address = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); + Option> remoting = + new Some>(new Tuple2("", 0)); + + // start a remote actor system to listen on an arbitrary port + ActorSystem actorSystem = AkkaUtils.createActorSystem(configuration, remoting); + + + ActorRef jobManager = JobManager.getJobManagerRemoteReference(address, actorSystem, timeout); + Future response = Patterns.ask(jobManager, new JobManagerMessages.CancelJob(jobId), new Timeout(AkkaUtils.INF_TIMEOUT())); + + try { + Await.result(response, timeout); --- End diff -- You should checked the result here. `Await.result` returns an Object which is either `CancellationSuccess` or `CancellationFailure`. > Provide API to cancel running job > - > > Key: FLINK-1818 > URL: https://issues.apache.org/jira/browse/FLINK-1818 > Project: Flink > Issue Type: Improvement > Components: Java API >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: niraj rai > Labels: starter > > http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Canceling-a-Cluster-Job-from-Java-td4897.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1818) Provide API to cancel running job
[ https://issues.apache.org/jira/browse/FLINK-1818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521828#comment-14521828 ] ASF GitHub Bot commented on FLINK-1818: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/642#discussion_r29447781 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/Client.java --- @@ -420,6 +428,60 @@ public JobSubmissionResult run(JobGraph jobGraph, boolean wait) throws ProgramIn } } + + /** +* Executes the CANCEL action. +* +* @param args Command line arguments for the cancel action. +*/ + + + protected int cancel(JobID jobId){ + LOG.info("Executing 'cancel' command."); + + final FiniteDuration timeout = AkkaUtils.getTimeout(configuration); + + try { + String address = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); + Option> remoting = + new Some>(new Tuple2("", 0)); + + // start a remote actor system to listen on an arbitrary port + ActorSystem actorSystem = AkkaUtils.createActorSystem(configuration, remoting); --- End diff -- You could have used `JobClient.startJobClientActorSystem(configuration)` to create the ActorSystem. That makes the above lines unnecessary. > Provide API to cancel running job > - > > Key: FLINK-1818 > URL: https://issues.apache.org/jira/browse/FLINK-1818 > Project: Flink > Issue Type: Improvement > Components: Java API >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: niraj rai > Labels: starter > > http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Canceling-a-Cluster-Job-from-Java-td4897.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1818] Added api to cancel job from clie...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/642#discussion_r29447781 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/Client.java --- @@ -420,6 +428,60 @@ public JobSubmissionResult run(JobGraph jobGraph, boolean wait) throws ProgramIn } } + + /** +* Executes the CANCEL action. +* +* @param args Command line arguments for the cancel action. +*/ + + + protected int cancel(JobID jobId){ + LOG.info("Executing 'cancel' command."); + + final FiniteDuration timeout = AkkaUtils.getTimeout(configuration); + + try { + String address = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); + Option> remoting = + new Some>(new Tuple2("", 0)); + + // start a remote actor system to listen on an arbitrary port + ActorSystem actorSystem = AkkaUtils.createActorSystem(configuration, remoting); --- End diff -- You could have used `JobClient.startJobClientActorSystem(configuration)` to create the ActorSystem. That makes the above lines unnecessary. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1818] Added api to cancel job from clie...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/642#discussion_r29447739 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/Client.java --- @@ -420,6 +428,60 @@ public JobSubmissionResult run(JobGraph jobGraph, boolean wait) throws ProgramIn } } + + /** +* Executes the CANCEL action. +* +* @param args Command line arguments for the cancel action. --- End diff -- The JavaDoc needs to be corrected. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1818) Provide API to cancel running job
[ https://issues.apache.org/jira/browse/FLINK-1818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521826#comment-14521826 ] ASF GitHub Bot commented on FLINK-1818: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/642#discussion_r29447739 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/Client.java --- @@ -420,6 +428,60 @@ public JobSubmissionResult run(JobGraph jobGraph, boolean wait) throws ProgramIn } } + + /** +* Executes the CANCEL action. +* +* @param args Command line arguments for the cancel action. --- End diff -- The JavaDoc needs to be corrected. > Provide API to cancel running job > - > > Key: FLINK-1818 > URL: https://issues.apache.org/jira/browse/FLINK-1818 > Project: Flink > Issue Type: Improvement > Components: Java API >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: niraj rai > Labels: starter > > http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Canceling-a-Cluster-Job-from-Java-td4897.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1690) ProcessFailureBatchRecoveryITCase.testTaskManagerProcessFailure spuriously fails on Travis
[ https://issues.apache.org/jira/browse/FLINK-1690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521736#comment-14521736 ] Stephan Ewen commented on FLINK-1690: - This one is interesting, it is not Test specific, it is a vulnerability that all tests have: A TaskManager port conflict. Because the port is chosen not by netty, but beforehand (randomly), two TaskManagers can attempt to open the same port (with a small probability). > ProcessFailureBatchRecoveryITCase.testTaskManagerProcessFailure spuriously > fails on Travis > -- > > Key: FLINK-1690 > URL: https://issues.apache.org/jira/browse/FLINK-1690 > Project: Flink > Issue Type: Bug >Reporter: Till Rohrmann >Priority: Minor > > I got the following error on Travis. > {code} > ProcessFailureBatchRecoveryITCase.testTaskManagerProcessFailure:244 The > program did not finish in time > {code} > I think we have to increase the timeouts for this test case to make it > reliably run on Travis. > The log of the failed Travis build can be found > [here|https://api.travis-ci.org/jobs/53952486/log.txt?deansi=true] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1818) Provide API to cancel running job
[ https://issues.apache.org/jira/browse/FLINK-1818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521681#comment-14521681 ] ASF GitHub Bot commented on FLINK-1818: --- GitHub user rainiraj opened a pull request: https://github.com/apache/flink/pull/642 [FLINK-1818] Added api to cancel job from client Please review the implementation of cancel command through client API. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rainiraj/flink clientapi Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/642.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #642 commit 26b59fcc0b0d49bb8942da22267ac12e963fa4f5 Author: rainiraj Date: 2015-04-30T15:28:29Z [FLINK-1818] Added api to cancel job from client > Provide API to cancel running job > - > > Key: FLINK-1818 > URL: https://issues.apache.org/jira/browse/FLINK-1818 > Project: Flink > Issue Type: Improvement > Components: Java API >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: niraj rai > Labels: starter > > http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Canceling-a-Cluster-Job-from-Java-td4897.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1818] Added api to cancel job from clie...
GitHub user rainiraj opened a pull request: https://github.com/apache/flink/pull/642 [FLINK-1818] Added api to cancel job from client Please review the implementation of cancel command through client API. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rainiraj/flink clientapi Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/642.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #642 commit 26b59fcc0b0d49bb8942da22267ac12e963fa4f5 Author: rainiraj Date: 2015-04-30T15:28:29Z [FLINK-1818] Added api to cancel job from client --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1951) NullPointerException in DeltaIteration when no ForwardedFileds
[ https://issues.apache.org/jira/browse/FLINK-1951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521678#comment-14521678 ] Vasia Kalavri commented on FLINK-1951: -- wow thanks a lot for the nice explanation and fix [~fhueske] ^^ > NullPointerException in DeltaIteration when no ForwardedFileds > -- > > Key: FLINK-1951 > URL: https://issues.apache.org/jira/browse/FLINK-1951 > Project: Flink > Issue Type: Bug > Components: Iterations >Affects Versions: 0.9 >Reporter: Vasia Kalavri >Assignee: Fabian Hueske >Priority: Critical > > The following exception is thrown by the Connected Components example, if the > @ForwardedFieldsFirst("*") annotation from the ComponentIdFilter join is > removed: > Caused by: java.lang.NullPointerException > at > org.apache.flink.examples.java.graph.ConnectedComponents$ComponentIdFilter.join(ConnectedComponents.java:186) > at > org.apache.flink.examples.java.graph.ConnectedComponents$ComponentIdFilter.join(ConnectedComponents.java:1) > at > org.apache.flink.runtime.operators.JoinWithSolutionSetSecondDriver.run(JoinWithSolutionSetSecondDriver.java:198) > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) > at > org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:139) > at > org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask.run(IterationIntermediatePactTask.java:92) > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > at > org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) > at java.lang.Thread.run(Thread.java:745) > [Code | https://github.com/vasia/flink/tree/cc-test] and [dataset | > http://snap.stanford.edu/data/com-DBLP.html] to reproduce. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1690) ProcessFailureBatchRecoveryITCase.testTaskManagerProcessFailure spuriously fails on Travis
[ https://issues.apache.org/jira/browse/FLINK-1690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521669#comment-14521669 ] Robert Metzger commented on FLINK-1690: --- Another instance https://travis-ci.org/apache/flink/jobs/60670489 > ProcessFailureBatchRecoveryITCase.testTaskManagerProcessFailure spuriously > fails on Travis > -- > > Key: FLINK-1690 > URL: https://issues.apache.org/jira/browse/FLINK-1690 > Project: Flink > Issue Type: Bug >Reporter: Till Rohrmann >Priority: Minor > > I got the following error on Travis. > {code} > ProcessFailureBatchRecoveryITCase.testTaskManagerProcessFailure:244 The > program did not finish in time > {code} > I think we have to increase the timeouts for this test case to make it > reliably run on Travis. > The log of the failed Travis build can be found > [here|https://api.travis-ci.org/jobs/53952486/log.txt?deansi=true] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1951) NullPointerException in DeltaIteration when no ForwardedFileds
[ https://issues.apache.org/jira/browse/FLINK-1951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521663#comment-14521663 ] ASF GitHub Bot commented on FLINK-1951: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/641#issuecomment-97842442 Looks good ;) > NullPointerException in DeltaIteration when no ForwardedFileds > -- > > Key: FLINK-1951 > URL: https://issues.apache.org/jira/browse/FLINK-1951 > Project: Flink > Issue Type: Bug > Components: Iterations >Affects Versions: 0.9 >Reporter: Vasia Kalavri >Assignee: Fabian Hueske >Priority: Critical > > The following exception is thrown by the Connected Components example, if the > @ForwardedFieldsFirst("*") annotation from the ComponentIdFilter join is > removed: > Caused by: java.lang.NullPointerException > at > org.apache.flink.examples.java.graph.ConnectedComponents$ComponentIdFilter.join(ConnectedComponents.java:186) > at > org.apache.flink.examples.java.graph.ConnectedComponents$ComponentIdFilter.join(ConnectedComponents.java:1) > at > org.apache.flink.runtime.operators.JoinWithSolutionSetSecondDriver.run(JoinWithSolutionSetSecondDriver.java:198) > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) > at > org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:139) > at > org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask.run(IterationIntermediatePactTask.java:92) > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > at > org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) > at java.lang.Thread.run(Thread.java:745) > [Code | https://github.com/vasia/flink/tree/cc-test] and [dataset | > http://snap.stanford.edu/data/com-DBLP.html] to reproduce. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-1965) Implement the Orthant-wise Limited Memory QuasiNewton optimization algorithm, a variant of L-BFGS that handles L1 regularization
[ https://issues.apache.org/jira/browse/FLINK-1965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Theodore Vasiloudis updated FLINK-1965: --- Description: The Orthant-wise Limited Memory QuasiNewton (OWL-QN) is a quasi-Newton optimization method similar to L-BFGS that can handle L1 regularization. Implementing this would allow us to obtain sparse solutions while at the same time having the convergence benefits of a quasi-Newton method, when compared to stochastic gradient descent. [Link to paper|http://research-srv.microsoft.com/en-us/um/people/jfgao/paper/icml07scalable.pdf] [Link to example implementation|http://research.microsoft.com/en-us/downloads/b1eb1016-1738-4bd5-83a9-370c9d498a03/] was: The Orthant-wise Limited Memory QuasiNewton (OWL-QN) is a quasi-Newton optimization method similar to L-BFGS that can handle L1 regularization. Implementing this would allow us to obtain sparse solutions while at the same time having the convergence benefits of a quasi-Newton method, when compared to stochastic gradient descent. [Link to paper|http://research.microsoft.com/en-us/downloads/b1eb1016-1738-4bd5-83a9-370c9d498a03/] > Implement the Orthant-wise Limited Memory QuasiNewton optimization algorithm, > a variant of L-BFGS that handles L1 regularization > > > Key: FLINK-1965 > URL: https://issues.apache.org/jira/browse/FLINK-1965 > Project: Flink > Issue Type: Wish > Components: Machine Learning Library >Reporter: Theodore Vasiloudis >Priority: Minor > Labels: ML > > The Orthant-wise Limited Memory QuasiNewton (OWL-QN) is a quasi-Newton > optimization method similar to L-BFGS that can handle L1 regularization. > Implementing this would allow us to obtain sparse solutions while at the same > time having the convergence benefits of a quasi-Newton method, when compared > to stochastic gradient descent. > [Link to > paper|http://research-srv.microsoft.com/en-us/um/people/jfgao/paper/icml07scalable.pdf] > [Link to example > implementation|http://research.microsoft.com/en-us/downloads/b1eb1016-1738-4bd5-83a9-370c9d498a03/] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1951] Fix NullPointerException in delta...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/641#issuecomment-97842442 Looks good ;) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1951) NullPointerException in DeltaIteration when no ForwardedFileds
[ https://issues.apache.org/jira/browse/FLINK-1951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521661#comment-14521661 ] ASF GitHub Bot commented on FLINK-1951: --- GitHub user fhueske opened a pull request: https://github.com/apache/flink/pull/641 [FLINK-1951] Fix NullPointerException in delta iteration due to missing input temp See JIRA for details. You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhueske/flink delta-npe-fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/641.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #641 commit 0c7ad4b03e3a47d5c7ee95659a31e62d89edfac6 Author: Fabian Hueske Date: 2015-04-30T15:34:02Z [FLINK-1951] Fix NullPointerException in delta iteration due to missing temp > NullPointerException in DeltaIteration when no ForwardedFileds > -- > > Key: FLINK-1951 > URL: https://issues.apache.org/jira/browse/FLINK-1951 > Project: Flink > Issue Type: Bug > Components: Iterations >Affects Versions: 0.9 >Reporter: Vasia Kalavri >Assignee: Fabian Hueske >Priority: Critical > > The following exception is thrown by the Connected Components example, if the > @ForwardedFieldsFirst("*") annotation from the ComponentIdFilter join is > removed: > Caused by: java.lang.NullPointerException > at > org.apache.flink.examples.java.graph.ConnectedComponents$ComponentIdFilter.join(ConnectedComponents.java:186) > at > org.apache.flink.examples.java.graph.ConnectedComponents$ComponentIdFilter.join(ConnectedComponents.java:1) > at > org.apache.flink.runtime.operators.JoinWithSolutionSetSecondDriver.run(JoinWithSolutionSetSecondDriver.java:198) > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) > at > org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:139) > at > org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask.run(IterationIntermediatePactTask.java:92) > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > at > org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) > at java.lang.Thread.run(Thread.java:745) > [Code | https://github.com/vasia/flink/tree/cc-test] and [dataset | > http://snap.stanford.edu/data/com-DBLP.html] to reproduce. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1951] Fix NullPointerException in delta...
GitHub user fhueske opened a pull request: https://github.com/apache/flink/pull/641 [FLINK-1951] Fix NullPointerException in delta iteration due to missing input temp See JIRA for details. You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhueske/flink delta-npe-fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/641.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #641 commit 0c7ad4b03e3a47d5c7ee95659a31e62d89edfac6 Author: Fabian Hueske Date: 2015-04-30T15:34:02Z [FLINK-1951] Fix NullPointerException in delta iteration due to missing temp --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-1965) Implement the Orthant-wise Limited Memory QuasiNewton optimization algorithm, a variant of L-BFGS that handles L1 regularization
Theodore Vasiloudis created FLINK-1965: -- Summary: Implement the Orthant-wise Limited Memory QuasiNewton optimization algorithm, a variant of L-BFGS that handles L1 regularization Key: FLINK-1965 URL: https://issues.apache.org/jira/browse/FLINK-1965 Project: Flink Issue Type: Wish Components: Machine Learning Library Reporter: Theodore Vasiloudis Priority: Minor The Orthant-wise Limited Memory QuasiNewton (OWL-QN) is a quasi-Newton optimization method similar to L-BFGS that can handle L1 regularization. Implementing this would allow us to obtain sparse solutions while at the same time having the convergence benefits of a quasi-Newton method, when compared to stochastic gradient descent. [Link to paper|http://research.microsoft.com/en-us/downloads/b1eb1016-1738-4bd5-83a9-370c9d498a03/] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1951) NullPointerException in DeltaIteration when no ForwardedFileds
[ https://issues.apache.org/jira/browse/FLINK-1951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521656#comment-14521656 ] Fabian Hueske commented on FLINK-1951: -- It turned out that the channel to the solution set update task was meant to be blocking, i.e., it should first collect all updates and apply them after the join with the solution set was done. That way the solution set is not concurrently probed and updated. The optimizer did the right decisions, however the decisions were not correctly translated into the JobGraph. > NullPointerException in DeltaIteration when no ForwardedFileds > -- > > Key: FLINK-1951 > URL: https://issues.apache.org/jira/browse/FLINK-1951 > Project: Flink > Issue Type: Bug > Components: Iterations >Affects Versions: 0.9 >Reporter: Vasia Kalavri >Assignee: Fabian Hueske >Priority: Critical > > The following exception is thrown by the Connected Components example, if the > @ForwardedFieldsFirst("*") annotation from the ComponentIdFilter join is > removed: > Caused by: java.lang.NullPointerException > at > org.apache.flink.examples.java.graph.ConnectedComponents$ComponentIdFilter.join(ConnectedComponents.java:186) > at > org.apache.flink.examples.java.graph.ConnectedComponents$ComponentIdFilter.join(ConnectedComponents.java:1) > at > org.apache.flink.runtime.operators.JoinWithSolutionSetSecondDriver.run(JoinWithSolutionSetSecondDriver.java:198) > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) > at > org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:139) > at > org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask.run(IterationIntermediatePactTask.java:92) > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > at > org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) > at java.lang.Thread.run(Thread.java:745) > [Code | https://github.com/vasia/flink/tree/cc-test] and [dataset | > http://snap.stanford.edu/data/com-DBLP.html] to reproduce. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [scheduling] implement backtracking of interme...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/640#issuecomment-97825782 Thank you for your valuable comments @tillrohrmann. I haven't worked with Akka's future compositions but it seems a sophisticated way to parallelize and combine the actor replies. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-1964) Rework TwitterSource to use a Properties object instead of a file path
[ https://issues.apache.org/jira/browse/FLINK-1964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Márton Balassi updated FLINK-1964: -- Labels: starter (was: ) > Rework TwitterSource to use a Properties object instead of a file path > -- > > Key: FLINK-1964 > URL: https://issues.apache.org/jira/browse/FLINK-1964 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 0.9 >Reporter: Robert Metzger >Priority: Minor > Labels: starter > > The twitter connector is very hard to use on a cluster because it expects the > property file to be present on all nodes. > It would be much easier to ask the user to pass a Properties object > immediately. > Also, the javadoc of the class stops in the middle of the sentence. > It was not obvious to me how the two examples TwitterStreaming and > TwitterTopology differ. Also, there is a third TwitterStream example in the > streaming examples. > The documentation of the Twitter source refers to the non existent > TwitterLocal class. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [scheduling] implement backtracking of interme...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/640#discussion_r29435008 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Backtracking.java --- @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import akka.actor.ActorRef; +import akka.dispatch.OnComplete; +import akka.pattern.Patterns; +import com.google.common.base.Preconditions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.messages.TaskMessages; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Deque; +import java.util.HashMap; +import java.util.Map; + + +/** + * Backtracking is a mechanism to schedule only those Execution Vertices of an Execution Graph which + * do not have an intermediate result available. This is in contrast to the simple way of scheduling + * a job, where all Execution Vertices are executed starting from the source. The Backtracking starts + * from the sinks and traverses the Execution Graph to the sources. It only reaches the sources if + * no intermediate result could be found on the way. + * + * @see ExecutionGraph + * @see ExecutionVertex + * @see Execution + */ +public class Backtracking { + + private static final Logger LOG = LoggerFactory.getLogger(Backtracking.class); + + private final Deque taskRequirements = new ArrayDeque(); + + private final Map visitedPartitions = new HashMap(); + + private ScheduleAction scheduleAction; + private Runnable postBacktrackingHook; + + public Backtracking(Collection vertices) { + Preconditions.checkNotNull(vertices); + + // add all sinks found to the stack + for (ExecutionJobVertex ejv : vertices) { + if (ejv.getJobVertex().isOutputVertex()) { + for (ExecutionVertex ev : ejv.getTaskVertices()) { + if (ev.getExecutionState() == ExecutionState.CREATED) { + taskRequirements.add(new TaskRequirement(ev)); + } + } + } + } + + this.scheduleAction = new ScheduleAction() { + @Override + public void schedule(ExecutionVertex ev) {} + }; + + this.postBacktrackingHook = new Runnable() { + @Override + public void run() {} + }; + } + + /** +* Scheduling to be performed when an ExecutionVertex is encountered that cannot be resumed +*/ + public interface ScheduleAction { + void schedule(ExecutionVertex ev); + } + + /** +* A TaskRequirement encaplusates an ExecutionVertex and its IntermediateResultPartitions which +* are required for execution. +*/ + private class TaskRequirement { + + private final ExecutionVertex executionVertex; + private final Deque pendingInputs = new ArrayDeque(); + private final int numInputs; + + private int availableInputs = 0; + + public TaskRequirement(ExecutionVertex executionVertex) { + this.executionVertex = executionVertex; + this.pendingInputs.addAll(executionVertex.getInputs()); + this.numInputs = pendingInputs.size(); + } + + public ExecutionVertex getExecutionVertex() { + r
[GitHub] flink pull request: [scheduling] implement backtracking of interme...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/640#discussion_r29434283 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Backtracking.java --- @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import akka.actor.ActorRef; +import akka.dispatch.OnComplete; +import akka.pattern.Patterns; +import com.google.common.base.Preconditions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.messages.TaskMessages; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Deque; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + + +/** + * Backtracking is a mechanism to schedule only those Execution Vertices of an Execution Graph which + * do not have an intermediate result available. This is in contrast to the simple way of scheduling + * a job, where all Execution Vertices are executed starting from the source. The Backtracking starts + * from the sinks and traverses the Execution Graph to the sources. It only reaches the sources if + * no intermediate result could be found on the way. + * + * @see ExecutionGraph + * @see ExecutionVertex + * @see Execution + */ +public class Backtracking { + + private static final Logger LOG = LoggerFactory.getLogger(Backtracking.class); + + private final Deque taskRequirements = new ArrayDeque(); + + private final Map visitedPartitions = new HashMap(); + + private ScheduleAction scheduleAction; + private Runnable postBacktrackingHook; + + public Backtracking(Collection vertices) { + Preconditions.checkNotNull(vertices); + + // add all sinks found to the stack + for (ExecutionJobVertex ejv : vertices) { + if (ejv.getJobVertex().isOutputVertex()) { + for (ExecutionVertex ev : ejv.getTaskVertices()) { + if (ev.getExecutionState() == ExecutionState.CREATED) { + taskRequirements.add(new TaskRequirement(ev)); + } + } + } + } + + this.scheduleAction = new ScheduleAction() { + @Override + public void schedule(ExecutionVertex ev) {} + }; + + this.postBacktrackingHook = new Runnable() { + @Override + public void run() {} + }; + } + + /** +* Scheduling to be performed when an ExecutionVertex is encountered that cannot be resumed +*/ + public interface ScheduleAction { + void schedule(ExecutionVertex ev); + } + + /** +* A TaskRequirement encapsulates an ExecutionVertex and its IntermediateResultPartitions which +* are required for execution. +*/ + private class TaskRequirement { + + private final ExecutionVertex executionVertex; + private final Deque pendingInputs = new ArrayDeque(); + private final int numInputs; + + private int availableInputs = 0; + + public TaskRequirement(ExecutionVertex executionVertex) { + this.executionVertex = executionVertex; + this.pendingInputs.addAll(executionVertex.getInputs()); + this.numInputs = pendingInputs.size(); + } + + public ExecutionVertex getExecutionVert
[GitHub] flink pull request: [scheduling] implement backtracking of interme...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/640#issuecomment-97818576 The backtracking looks good @mxm. I have some remarks concerning the way the locking of the partition on the TMs works. At the moment this happens sequentially, meaning that for each `IntermediateResultPartition` a message is sent to the TM and then the response is awaited. Only after receiving the response of this TM, the next `IntermediateResultPartition` is processed. This can considerably slow down the scheduling if the degree of parallelism is high. I think we should make use of Akka's future composition to do that concurrently. Furthermore, we could think about doing the backtracking in parallel. This could also speed up the scheduling process. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [scheduling] implement backtracking of interme...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/640#discussion_r29433574 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Backtracking.java --- @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import akka.actor.ActorRef; +import akka.dispatch.OnComplete; +import akka.pattern.Patterns; +import com.google.common.base.Preconditions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.messages.TaskMessages; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Deque; +import java.util.HashMap; +import java.util.Map; + + +/** + * Backtracking is a mechanism to schedule only those Execution Vertices of an Execution Graph which + * do not have an intermediate result available. This is in contrast to the simple way of scheduling + * a job, where all Execution Vertices are executed starting from the source. The Backtracking starts + * from the sinks and traverses the Execution Graph to the sources. It only reaches the sources if + * no intermediate result could be found on the way. + * + * @see ExecutionGraph + * @see ExecutionVertex + * @see Execution + */ +public class Backtracking { + + private static final Logger LOG = LoggerFactory.getLogger(Backtracking.class); + + private final Deque taskRequirements = new ArrayDeque(); + + private final Map visitedPartitions = new HashMap(); + + private ScheduleAction scheduleAction; + private Runnable postBacktrackingHook; + + public Backtracking(Collection vertices) { + Preconditions.checkNotNull(vertices); + + // add all sinks found to the stack + for (ExecutionJobVertex ejv : vertices) { + if (ejv.getJobVertex().isOutputVertex()) { + for (ExecutionVertex ev : ejv.getTaskVertices()) { + if (ev.getExecutionState() == ExecutionState.CREATED) { + taskRequirements.add(new TaskRequirement(ev)); + } + } + } + } + + this.scheduleAction = new ScheduleAction() { + @Override + public void schedule(ExecutionVertex ev) {} + }; + + this.postBacktrackingHook = new Runnable() { + @Override + public void run() {} + }; + } + + /** +* Scheduling to be performed when an ExecutionVertex is encountered that cannot be resumed +*/ + public interface ScheduleAction { + void schedule(ExecutionVertex ev); + } + + /** +* A TaskRequirement encaplusates an ExecutionVertex and its IntermediateResultPartitions which +* are required for execution. +*/ + private class TaskRequirement { + + private final ExecutionVertex executionVertex; + private final Deque pendingInputs = new ArrayDeque(); + private final int numInputs; + + private int availableInputs = 0; + + public TaskRequirement(ExecutionVertex executionVertex) { + this.executionVertex = executionVertex; + this.pendingInputs.addAll(executionVertex.getInputs()); + this.numInputs = pendingInputs.size(); + } + + public ExecutionVertex getExecutionVertex() { +
[jira] [Created] (FLINK-1964) Rework TwitterSource to use a Properties object instead of a file path
Robert Metzger created FLINK-1964: - Summary: Rework TwitterSource to use a Properties object instead of a file path Key: FLINK-1964 URL: https://issues.apache.org/jira/browse/FLINK-1964 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.9 Reporter: Robert Metzger Priority: Minor The twitter connector is very hard to use on a cluster because it expects the property file to be present on all nodes. It would be much easier to ask the user to pass a Properties object immediately. Also, the javadoc of the class stops in the middle of the sentence. It was not obvious to me how the two examples TwitterStreaming and TwitterTopology differ. Also, there is a third TwitterStream example in the streaming examples. The documentation of the Twitter source refers to the non existent TwitterLocal class. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [scheduling] implement backtracking of interme...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/640#discussion_r29432849 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Backtracking.java --- @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import akka.actor.ActorRef; +import akka.dispatch.OnComplete; +import akka.pattern.Patterns; +import com.google.common.base.Preconditions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.messages.TaskMessages; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Deque; +import java.util.HashMap; +import java.util.Map; + + +/** + * Backtracking is a mechanism to schedule only those Execution Vertices of an Execution Graph which + * do not have an intermediate result available. This is in contrast to the simple way of scheduling + * a job, where all Execution Vertices are executed starting from the source. The Backtracking starts + * from the sinks and traverses the Execution Graph to the sources. It only reaches the sources if + * no intermediate result could be found on the way. + * + * @see ExecutionGraph + * @see ExecutionVertex + * @see Execution + */ +public class Backtracking { + + private static final Logger LOG = LoggerFactory.getLogger(Backtracking.class); + + private final Deque taskRequirements = new ArrayDeque(); + + private final Map visitedPartitions = new HashMap(); + + private ScheduleAction scheduleAction; + private Runnable postBacktrackingHook; + + public Backtracking(Collection vertices) { + Preconditions.checkNotNull(vertices); + + // add all sinks found to the stack + for (ExecutionJobVertex ejv : vertices) { + if (ejv.getJobVertex().isOutputVertex()) { + for (ExecutionVertex ev : ejv.getTaskVertices()) { + if (ev.getExecutionState() == ExecutionState.CREATED) { + taskRequirements.add(new TaskRequirement(ev)); + } + } + } + } + + this.scheduleAction = new ScheduleAction() { + @Override + public void schedule(ExecutionVertex ev) {} + }; + + this.postBacktrackingHook = new Runnable() { + @Override + public void run() {} + }; + } + + /** +* Scheduling to be performed when an ExecutionVertex is encountered that cannot be resumed +*/ + public interface ScheduleAction { + void schedule(ExecutionVertex ev); + } + + /** +* A TaskRequirement encaplusates an ExecutionVertex and its IntermediateResultPartitions which +* are required for execution. +*/ + private class TaskRequirement { + + private final ExecutionVertex executionVertex; + private final Deque pendingInputs = new ArrayDeque(); + private final int numInputs; + + private int availableInputs = 0; + + public TaskRequirement(ExecutionVertex executionVertex) { + this.executionVertex = executionVertex; + this.pendingInputs.addAll(executionVertex.getInputs()); + this.numInputs = pendingInputs.size(); + } + + public ExecutionVertex getExecutionVertex() { + r
[jira] [Commented] (FLINK-1103) Update Streaming examples to become self-contained
[ https://issues.apache.org/jira/browse/FLINK-1103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521546#comment-14521546 ] Robert Metzger commented on FLINK-1103: --- Where is the data in {{TwitterStreamData}} coming from? Can we just take it an license it under ASF? > Update Streaming examples to become self-contained > -- > > Key: FLINK-1103 > URL: https://issues.apache.org/jira/browse/FLINK-1103 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.7.0-incubating >Reporter: Márton Balassi >Assignee: Márton Balassi > > Streaming examples do not follow the standard set by the recent examples > refactor of the batch API. > TestDataUtil should be removed and Object[][] used to contain the example > data. > Comments are also lacking in comparison with the batch counterpart. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [scheduling] implement backtracking of interme...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/640#discussion_r29432635 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Backtracking.java --- @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import akka.actor.ActorRef; +import akka.dispatch.OnComplete; +import akka.pattern.Patterns; +import com.google.common.base.Preconditions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.messages.TaskMessages; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Deque; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + + +/** + * Backtracking is a mechanism to schedule only those Execution Vertices of an Execution Graph which + * do not have an intermediate result available. This is in contrast to the simple way of scheduling + * a job, where all Execution Vertices are executed starting from the source. The Backtracking starts + * from the sinks and traverses the Execution Graph to the sources. It only reaches the sources if + * no intermediate result could be found on the way. + * + * @see ExecutionGraph + * @see ExecutionVertex + * @see Execution + */ +public class Backtracking { + + private static final Logger LOG = LoggerFactory.getLogger(Backtracking.class); + + private final Deque taskRequirements = new ArrayDeque(); + + private final Map visitedPartitions = new HashMap(); + + private ScheduleAction scheduleAction; + private Runnable postBacktrackingHook; + + public Backtracking(Collection vertices) { + Preconditions.checkNotNull(vertices); + + // add all sinks found to the stack + for (ExecutionJobVertex ejv : vertices) { + if (ejv.getJobVertex().isOutputVertex()) { + for (ExecutionVertex ev : ejv.getTaskVertices()) { + if (ev.getExecutionState() == ExecutionState.CREATED) { + taskRequirements.add(new TaskRequirement(ev)); + } + } + } + } + + this.scheduleAction = new ScheduleAction() { + @Override + public void schedule(ExecutionVertex ev) {} + }; + + this.postBacktrackingHook = new Runnable() { + @Override + public void run() {} + }; + } + + /** +* Scheduling to be performed when an ExecutionVertex is encountered that cannot be resumed +*/ + public interface ScheduleAction { + void schedule(ExecutionVertex ev); + } + + /** +* A TaskRequirement encapsulates an ExecutionVertex and its IntermediateResultPartitions which +* are required for execution. +*/ + private class TaskRequirement { + + private final ExecutionVertex executionVertex; + private final Deque pendingInputs = new ArrayDeque(); + private final int numInputs; + + private int availableInputs = 0; + + public TaskRequirement(ExecutionVertex executionVertex) { + this.executionVertex = executionVertex; + this.pendingInputs.addAll(executionVertex.getInputs()); + this.numInputs = pendingInputs.size(); + } + + public ExecutionVertex getExec
[GitHub] flink pull request: [scheduling] implement backtracking of interme...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/640#discussion_r29432295 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Backtracking.java --- @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import akka.actor.ActorRef; +import akka.dispatch.OnComplete; +import akka.pattern.Patterns; +import com.google.common.base.Preconditions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.messages.TaskMessages; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Deque; +import java.util.HashMap; +import java.util.Map; + + +/** + * Backtracking is a mechanism to schedule only those Execution Vertices of an Execution Graph which + * do not have an intermediate result available. This is in contrast to the simple way of scheduling + * a job, where all Execution Vertices are executed starting from the source. The Backtracking starts + * from the sinks and traverses the Execution Graph to the sources. It only reaches the sources if + * no intermediate result could be found on the way. + * + * @see ExecutionGraph + * @see ExecutionVertex + * @see Execution + */ +public class Backtracking { + + private static final Logger LOG = LoggerFactory.getLogger(Backtracking.class); + + private final Deque taskRequirements = new ArrayDeque(); + + private final Map visitedPartitions = new HashMap(); + + private ScheduleAction scheduleAction; + private Runnable postBacktrackingHook; + + public Backtracking(Collection vertices) { + Preconditions.checkNotNull(vertices); + + // add all sinks found to the stack + for (ExecutionJobVertex ejv : vertices) { + if (ejv.getJobVertex().isOutputVertex()) { + for (ExecutionVertex ev : ejv.getTaskVertices()) { + if (ev.getExecutionState() == ExecutionState.CREATED) { + taskRequirements.add(new TaskRequirement(ev)); + } + } + } + } + + this.scheduleAction = new ScheduleAction() { + @Override + public void schedule(ExecutionVertex ev) {} + }; + + this.postBacktrackingHook = new Runnable() { + @Override + public void run() {} + }; + } + + /** +* Scheduling to be performed when an ExecutionVertex is encountered that cannot be resumed +*/ + public interface ScheduleAction { + void schedule(ExecutionVertex ev); + } + + /** +* A TaskRequirement encaplusates an ExecutionVertex and its IntermediateResultPartitions which +* are required for execution. +*/ + private class TaskRequirement { + + private final ExecutionVertex executionVertex; + private final Deque pendingInputs = new ArrayDeque(); + private final int numInputs; + + private int availableInputs = 0; + + public TaskRequirement(ExecutionVertex executionVertex) { + this.executionVertex = executionVertex; + this.pendingInputs.addAll(executionVertex.getInputs()); + this.numInputs = pendingInputs.size(); + } + + public ExecutionVertex getExecutionVertex() { +
[GitHub] flink pull request: [scheduling] implement backtracking of interme...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/640#discussion_r29431402 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Backtracking.java --- @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import akka.actor.ActorRef; +import akka.dispatch.OnComplete; +import akka.pattern.Patterns; +import com.google.common.base.Preconditions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.messages.TaskMessages; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Deque; +import java.util.HashMap; +import java.util.Map; + + +/** + * Backtracking is a mechanism to schedule only those Execution Vertices of an Execution Graph which + * do not have an intermediate result available. This is in contrast to the simple way of scheduling + * a job, where all Execution Vertices are executed starting from the source. The Backtracking starts + * from the sinks and traverses the Execution Graph to the sources. It only reaches the sources if + * no intermediate result could be found on the way. + * + * @see ExecutionGraph + * @see ExecutionVertex + * @see Execution + */ +public class Backtracking { + + private static final Logger LOG = LoggerFactory.getLogger(Backtracking.class); + + private final Deque taskRequirements = new ArrayDeque(); + + private final Map visitedPartitions = new HashMap(); + + private ScheduleAction scheduleAction; + private Runnable postBacktrackingHook; + + public Backtracking(Collection vertices) { + Preconditions.checkNotNull(vertices); + + // add all sinks found to the stack + for (ExecutionJobVertex ejv : vertices) { + if (ejv.getJobVertex().isOutputVertex()) { + for (ExecutionVertex ev : ejv.getTaskVertices()) { + if (ev.getExecutionState() == ExecutionState.CREATED) { + taskRequirements.add(new TaskRequirement(ev)); + } + } + } + } + + this.scheduleAction = new ScheduleAction() { + @Override + public void schedule(ExecutionVertex ev) {} + }; + + this.postBacktrackingHook = new Runnable() { + @Override + public void run() {} + }; + } + + /** +* Scheduling to be performed when an ExecutionVertex is encountered that cannot be resumed +*/ + public interface ScheduleAction { + void schedule(ExecutionVertex ev); + } + + /** +* A TaskRequirement encaplusates an ExecutionVertex and its IntermediateResultPartitions which +* are required for execution. +*/ + private class TaskRequirement { + + private final ExecutionVertex executionVertex; + private final Deque pendingInputs = new ArrayDeque(); + private final int numInputs; + + private int availableInputs = 0; + + public TaskRequirement(ExecutionVertex executionVertex) { + this.executionVertex = executionVertex; + this.pendingInputs.addAll(executionVertex.getInputs()); + this.numInputs = pendingInputs.size(); + } + + public ExecutionVertex getExecutionVertex() { + r
[GitHub] flink pull request: [scheduling] implement backtracking of interme...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/640#discussion_r29431413 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -475,11 +479,21 @@ class JobManager(val flinkConfiguration: Configuration, throw new JobSubmissionException(jobId, "The given job is empty") } -// see if there already exists an ExecutionGraph for the corresponding job ID -executionGraph = currentJobs.getOrElseUpdate(jobGraph.getJobID, - (new ExecutionGraph(jobGraph.getJobID, jobGraph.getName, -jobGraph.getJobConfiguration, timeout, jobGraph.getUserJarBlobKeys, userCodeLoader), -JobInfo(sender(), System.currentTimeMillis(._1 +executionGraph = currentJob match { + case Some((graph, _)) if !graph.getState.isTerminalState => + throw new Exception("Job still running") --- End diff -- I hadn't thought about attaching vertices to a running execution graph. That could makes sense. Especially, in the case of streaming. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [scheduling] implement backtracking of interme...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/640#discussion_r29431391 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Backtracking.java --- @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import akka.actor.ActorRef; +import akka.dispatch.OnComplete; +import akka.pattern.Patterns; +import com.google.common.base.Preconditions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.messages.TaskMessages; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Deque; +import java.util.HashMap; +import java.util.Map; + + +/** + * Backtracking is a mechanism to schedule only those Execution Vertices of an Execution Graph which + * do not have an intermediate result available. This is in contrast to the simple way of scheduling + * a job, where all Execution Vertices are executed starting from the source. The Backtracking starts + * from the sinks and traverses the Execution Graph to the sources. It only reaches the sources if + * no intermediate result could be found on the way. + * + * @see ExecutionGraph + * @see ExecutionVertex + * @see Execution + */ +public class Backtracking { + + private static final Logger LOG = LoggerFactory.getLogger(Backtracking.class); + + private final Deque taskRequirements = new ArrayDeque(); + + private final Map visitedPartitions = new HashMap(); + + private ScheduleAction scheduleAction; + private Runnable postBacktrackingHook; + + public Backtracking(Collection vertices) { + Preconditions.checkNotNull(vertices); + + // add all sinks found to the stack + for (ExecutionJobVertex ejv : vertices) { + if (ejv.getJobVertex().isOutputVertex()) { + for (ExecutionVertex ev : ejv.getTaskVertices()) { + if (ev.getExecutionState() == ExecutionState.CREATED) { + taskRequirements.add(new TaskRequirement(ev)); + } + } + } + } + + this.scheduleAction = new ScheduleAction() { + @Override + public void schedule(ExecutionVertex ev) {} + }; + + this.postBacktrackingHook = new Runnable() { + @Override + public void run() {} + }; + } + + /** +* Scheduling to be performed when an ExecutionVertex is encountered that cannot be resumed +*/ + public interface ScheduleAction { + void schedule(ExecutionVertex ev); + } + + /** +* A TaskRequirement encaplusates an ExecutionVertex and its IntermediateResultPartitions which +* are required for execution. +*/ + private class TaskRequirement { + + private final ExecutionVertex executionVertex; + private final Deque pendingInputs = new ArrayDeque(); + private final int numInputs; + + private int availableInputs = 0; + + public TaskRequirement(ExecutionVertex executionVertex) { + this.executionVertex = executionVertex; + this.pendingInputs.addAll(executionVertex.getInputs()); + this.numInputs = pendingInputs.size(); + } + + public ExecutionVertex getExecutionVertex() { + r
[jira] [Commented] (FLINK-1951) NullPointerException in DeltaIteration when no ForwardedFileds
[ https://issues.apache.org/jira/browse/FLINK-1951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521507#comment-14521507 ] Fabian Hueske commented on FLINK-1951: -- It was a bit tricky and took some time but I think I finally found the bug :-) The problem is the following. The failing join is the join with the solution set. The result of this join is the solution set data that is reinserted into the hashtable that was previously probed. If the annotation is set, the optimizer knows that the result of the join is still partitioned and can do the insert locally by chaining the calls. This means that probing and reinserting of the same HT happens in the same thread, i.e., there is no concurrent access. If the annotation is NOT set, the optimizer does not know that the result is still partitioned. Hence, it inserts a network shuffling strategy to partition the data. The receiving task then reinserts the solution set delta records into the hash table. Since probing and inserting are happening in two separate tasks which are separated by a network connection, they happen concurrently in two different threads and might interfere. Guarding reinsertion and probing with a simple lock solves the problem. Not sure if that's the best fix for the problem though... > NullPointerException in DeltaIteration when no ForwardedFileds > -- > > Key: FLINK-1951 > URL: https://issues.apache.org/jira/browse/FLINK-1951 > Project: Flink > Issue Type: Bug > Components: Iterations >Affects Versions: 0.9 >Reporter: Vasia Kalavri >Assignee: Fabian Hueske >Priority: Critical > > The following exception is thrown by the Connected Components example, if the > @ForwardedFieldsFirst("*") annotation from the ComponentIdFilter join is > removed: > Caused by: java.lang.NullPointerException > at > org.apache.flink.examples.java.graph.ConnectedComponents$ComponentIdFilter.join(ConnectedComponents.java:186) > at > org.apache.flink.examples.java.graph.ConnectedComponents$ComponentIdFilter.join(ConnectedComponents.java:1) > at > org.apache.flink.runtime.operators.JoinWithSolutionSetSecondDriver.run(JoinWithSolutionSetSecondDriver.java:198) > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) > at > org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:139) > at > org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask.run(IterationIntermediatePactTask.java:92) > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > at > org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) > at java.lang.Thread.run(Thread.java:745) > [Code | https://github.com/vasia/flink/tree/cc-test] and [dataset | > http://snap.stanford.edu/data/com-DBLP.html] to reproduce. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [scheduling] implement backtracking of interme...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/640#discussion_r29427789 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -475,11 +479,21 @@ class JobManager(val flinkConfiguration: Configuration, throw new JobSubmissionException(jobId, "The given job is empty") } -// see if there already exists an ExecutionGraph for the corresponding job ID -executionGraph = currentJobs.getOrElseUpdate(jobGraph.getJobID, - (new ExecutionGraph(jobGraph.getJobID, jobGraph.getName, -jobGraph.getJobConfiguration, timeout, jobGraph.getUserJarBlobKeys, userCodeLoader), -JobInfo(sender(), System.currentTimeMillis(._1 +executionGraph = currentJob match { + case Some((graph, _)) if !graph.getState.isTerminalState => + throw new Exception("Job still running") --- End diff -- Why can I not append new vertices to a running execution graph? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [scheduling] implement backtracking of interme...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/640#discussion_r29426706 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Backtracking.java --- @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import akka.actor.ActorRef; +import akka.dispatch.OnComplete; +import akka.pattern.Patterns; +import com.google.common.base.Preconditions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.messages.TaskMessages; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Deque; +import java.util.HashMap; +import java.util.Map; + + +/** + * Backtracking is a mechanism to schedule only those Execution Vertices of an Execution Graph which + * do not have an intermediate result available. This is in contrast to the simple way of scheduling + * a job, where all Execution Vertices are executed starting from the source. The Backtracking starts + * from the sinks and traverses the Execution Graph to the sources. It only reaches the sources if + * no intermediate result could be found on the way. + * + * @see ExecutionGraph + * @see ExecutionVertex + * @see Execution + */ +public class Backtracking { + + private static final Logger LOG = LoggerFactory.getLogger(Backtracking.class); + + private final Deque taskRequirements = new ArrayDeque(); + + private final Map visitedPartitions = new HashMap(); + + private ScheduleAction scheduleAction; + private Runnable postBacktrackingHook; + + public Backtracking(Collection vertices) { + Preconditions.checkNotNull(vertices); + + // add all sinks found to the stack + for (ExecutionJobVertex ejv : vertices) { + if (ejv.getJobVertex().isOutputVertex()) { + for (ExecutionVertex ev : ejv.getTaskVertices()) { + if (ev.getExecutionState() == ExecutionState.CREATED) { + taskRequirements.add(new TaskRequirement(ev)); + } + } + } + } + + this.scheduleAction = new ScheduleAction() { + @Override + public void schedule(ExecutionVertex ev) {} + }; + + this.postBacktrackingHook = new Runnable() { + @Override + public void run() {} + }; + } + + /** +* Scheduling to be performed when an ExecutionVertex is encountered that cannot be resumed +*/ + public interface ScheduleAction { + void schedule(ExecutionVertex ev); + } + + /** +* A TaskRequirement encaplusates an ExecutionVertex and its IntermediateResultPartitions which +* are required for execution. +*/ + private class TaskRequirement { + + private final ExecutionVertex executionVertex; + private final Deque pendingInputs = new ArrayDeque(); + private final int numInputs; + + private int availableInputs = 0; + + public TaskRequirement(ExecutionVertex executionVertex) { + this.executionVertex = executionVertex; + this.pendingInputs.addAll(executionVertex.getInputs()); + this.numInputs = pendingInputs.size(); + } + + public ExecutionVertex getExecutionVertex() { +
[GitHub] flink pull request: [scheduling] implement backtracking of interme...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/640#discussion_r29426482 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Backtracking.java --- @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import akka.actor.ActorRef; +import akka.dispatch.OnComplete; +import akka.pattern.Patterns; +import com.google.common.base.Preconditions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.messages.TaskMessages; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Deque; +import java.util.HashMap; +import java.util.Map; + + +/** + * Backtracking is a mechanism to schedule only those Execution Vertices of an Execution Graph which + * do not have an intermediate result available. This is in contrast to the simple way of scheduling + * a job, where all Execution Vertices are executed starting from the source. The Backtracking starts + * from the sinks and traverses the Execution Graph to the sources. It only reaches the sources if + * no intermediate result could be found on the way. + * + * @see ExecutionGraph + * @see ExecutionVertex + * @see Execution + */ +public class Backtracking { + + private static final Logger LOG = LoggerFactory.getLogger(Backtracking.class); + + private final Deque taskRequirements = new ArrayDeque(); + + private final Map visitedPartitions = new HashMap(); + + private ScheduleAction scheduleAction; + private Runnable postBacktrackingHook; + + public Backtracking(Collection vertices) { + Preconditions.checkNotNull(vertices); + + // add all sinks found to the stack + for (ExecutionJobVertex ejv : vertices) { + if (ejv.getJobVertex().isOutputVertex()) { + for (ExecutionVertex ev : ejv.getTaskVertices()) { + if (ev.getExecutionState() == ExecutionState.CREATED) { + taskRequirements.add(new TaskRequirement(ev)); + } + } + } + } + + this.scheduleAction = new ScheduleAction() { + @Override + public void schedule(ExecutionVertex ev) {} + }; + + this.postBacktrackingHook = new Runnable() { + @Override + public void run() {} + }; + } + + /** +* Scheduling to be performed when an ExecutionVertex is encountered that cannot be resumed +*/ + public interface ScheduleAction { + void schedule(ExecutionVertex ev); + } + + /** +* A TaskRequirement encaplusates an ExecutionVertex and its IntermediateResultPartitions which +* are required for execution. +*/ + private class TaskRequirement { + + private final ExecutionVertex executionVertex; + private final Deque pendingInputs = new ArrayDeque(); + private final int numInputs; + + private int availableInputs = 0; + + public TaskRequirement(ExecutionVertex executionVertex) { + this.executionVertex = executionVertex; + this.pendingInputs.addAll(executionVertex.getInputs()); + this.numInputs = pendingInputs.size(); + } + + public ExecutionVertex getExecutionVertex() { +
[jira] [Updated] (FLINK-1963) Improve distinct() transformation
[ https://issues.apache.org/jira/browse/FLINK-1963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-1963: - Description: The `distinct()` transformation is a bit limited right now with respect to processing atomic key types: - `distinct(String ...)` works only for composite data types (POJO, tuple), but wildcard expression should also be supported for atomic key types - `distinct()` only works for composite types, but should also work for atomic key types - `distinct(KeySelector)` is the most generic one, but not very handy to use - `distinct(int ...)` works only for Tuple data types (which is fine) Fixing this should be rather easy. was: The `distinct()` transformation is a bit limited right now with respect to processing atomic key types: - `distinct(String ...)` works only for composite data types (POJO, tuple), but wildcard expression should also be supported for atomic key types - `distinct()` only works for composite types, but should also work for atomic key types - `distinct(KeySelector)` is the most generic one, but not very handy to use - `distinct(int ...)` works only for Tuple data types (which is fine) > Improve distinct() transformation > - > > Key: FLINK-1963 > URL: https://issues.apache.org/jira/browse/FLINK-1963 > Project: Flink > Issue Type: Improvement > Components: Java API, Scala API >Affects Versions: 0.9 >Reporter: Fabian Hueske >Priority: Minor > Labels: starter > Fix For: 0.9 > > > The `distinct()` transformation is a bit limited right now with respect to > processing atomic key types: > - `distinct(String ...)` works only for composite data types (POJO, tuple), > but wildcard expression should also be supported for atomic key types > - `distinct()` only works for composite types, but should also work for > atomic key types > - `distinct(KeySelector)` is the most generic one, but not very handy to use > - `distinct(int ...)` works only for Tuple data types (which is fine) > Fixing this should be rather easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-1963) Improve distinct() transformation
Fabian Hueske created FLINK-1963: Summary: Improve distinct() transformation Key: FLINK-1963 URL: https://issues.apache.org/jira/browse/FLINK-1963 Project: Flink Issue Type: Improvement Components: Java API, Scala API Affects Versions: 0.9 Reporter: Fabian Hueske Priority: Minor Fix For: 0.9 The `distinct()` transformation is a bit limited right now with respect to processing atomic key types: - `distinct(String ...)` works only for composite data types (POJO, tuple), but wildcard expression should also be supported for atomic key types - `distinct()` only works for composite types, but should also work for atomic key types - `distinct(KeySelector)` is the most generic one, but not very handy to use - `distinct(int ...)` works only for Tuple data types (which is fine) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-1963) Improve distinct() transformation
[ https://issues.apache.org/jira/browse/FLINK-1963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-1963: - Labels: starter (was: start) > Improve distinct() transformation > - > > Key: FLINK-1963 > URL: https://issues.apache.org/jira/browse/FLINK-1963 > Project: Flink > Issue Type: Improvement > Components: Java API, Scala API >Affects Versions: 0.9 >Reporter: Fabian Hueske >Priority: Minor > Labels: starter > Fix For: 0.9 > > > The `distinct()` transformation is a bit limited right now with respect to > processing atomic key types: > - `distinct(String ...)` works only for composite data types (POJO, tuple), > but wildcard expression should also be supported for atomic key types > - `distinct()` only works for composite types, but should also work for > atomic key types > - `distinct(KeySelector)` is the most generic one, but not very handy to use > - `distinct(int ...)` works only for Tuple data types (which is fine) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1962) Add Gelly Scala API
[ https://issues.apache.org/jira/browse/FLINK-1962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521276#comment-14521276 ] Sebastian Schelter commented on FLINK-1962: --- Let me know once you have an implementation to play with, I have some scala graph analysis code lying around that I could port to Gelly. > Add Gelly Scala API > --- > > Key: FLINK-1962 > URL: https://issues.apache.org/jira/browse/FLINK-1962 > Project: Flink > Issue Type: Task > Components: Gelly, Scala API >Affects Versions: 0.9 >Reporter: Vasia Kalavri > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-1962) Add Gelly Scala API
Vasia Kalavri created FLINK-1962: Summary: Add Gelly Scala API Key: FLINK-1962 URL: https://issues.apache.org/jira/browse/FLINK-1962 Project: Flink Issue Type: Task Components: Gelly, Scala API Affects Versions: 0.9 Reporter: Vasia Kalavri -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1959) Accumulators BROKEN after Partitioning
[ https://issues.apache.org/jira/browse/FLINK-1959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521195#comment-14521195 ] mustafa elbehery commented on FLINK-1959: - I just tried with DOP 1,2 and 5 .. All return NULL "Number of detected empty fields per column: null" > Accumulators BROKEN after Partitioning > -- > > Key: FLINK-1959 > URL: https://issues.apache.org/jira/browse/FLINK-1959 > Project: Flink > Issue Type: Bug > Components: Examples >Affects Versions: 0.8.1 >Reporter: mustafa elbehery >Priority: Critical > Fix For: 0.8.1 > > > while running the Accumulator example in > https://github.com/Elbehery/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java, > > I tried to alter the data flow with "PartitionByHash" function before > applying "Filter", and the resulted accumulator was NULL. > By Debugging, I could see the accumulator in the RunTime Map. However, by > retrieving the accumulator from the JobExecutionResult object, it was NULL. > The line caused the problem is "file.partitionByHash(1).filter(new > EmptyFieldFilter())" instead of "file.filter(new EmptyFieldFilter())" -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-1843) Job History gets cleared too fast
[ https://issues.apache.org/jira/browse/FLINK-1843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels resolved FLINK-1843. --- Resolution: Fixed > Job History gets cleared too fast > - > > Key: FLINK-1843 > URL: https://issues.apache.org/jira/browse/FLINK-1843 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 0.9 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Labels: starter > Fix For: 0.9 > > > As per FLINK-1442, the JobManager stores the archived ExecutionGraph behind a > SoftReference. At least for local setups, this mechanism doesn't seem to work > properly. There are two issues: > - The history gets cleared too fast > - The history gets cleared in a non-sequential fashion, i.e. arbitrary old > ExecutionGraph are discarded > To solve these problems we might > - Store the least recent ExecutionGraph behind a SoftReference > - Store the most recent ExecutionGraphs without a SoftReference > That way, we can save memory but have the latest history available to the > user. We might introduce a configuration variable where the user can specify > the number of ExecutionGraphs that should be held in memory. The remaining > can be stored behind a SoftReference. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1749) Add Boosting algorithm for ensemble learning to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521127#comment-14521127 ] narayana reddy commented on FLINK-1749: --- Thank you Till Rohrmann. > Add Boosting algorithm for ensemble learning to machine learning library > > > Key: FLINK-1749 > URL: https://issues.apache.org/jira/browse/FLINK-1749 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Till Rohrmann >Assignee: narayana reddy > Labels: ML > > Boosting [1] can help to create strong learners from an ensemble of weak > learners and thus improving its performance. Widely used boosting algorithms > are AdaBoost [2] and LogitBoost [3]. The work of I. Palit and C. K. Reddy [4] > investigates how boosting can be efficiently realised in a distributed > setting. > Resources: > [1] [http://en.wikipedia.org/wiki/Boosting_%28machine_learning%29] > [2] [http://en.wikipedia.org/wiki/AdaBoost] > [3] [http://en.wikipedia.org/wiki/LogitBoost] > [4] [http://ieeexplore.ieee.org/xpls/abs_all.jsp?arnumber=6035709] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-1749) Add Boosting algorithm for ensemble learning to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-1749: - Assignee: narayana reddy > Add Boosting algorithm for ensemble learning to machine learning library > > > Key: FLINK-1749 > URL: https://issues.apache.org/jira/browse/FLINK-1749 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Till Rohrmann >Assignee: narayana reddy > Labels: ML > > Boosting [1] can help to create strong learners from an ensemble of weak > learners and thus improving its performance. Widely used boosting algorithms > are AdaBoost [2] and LogitBoost [3]. The work of I. Palit and C. K. Reddy [4] > investigates how boosting can be efficiently realised in a distributed > setting. > Resources: > [1] [http://en.wikipedia.org/wiki/Boosting_%28machine_learning%29] > [2] [http://en.wikipedia.org/wiki/AdaBoost] > [3] [http://en.wikipedia.org/wiki/LogitBoost] > [4] [http://ieeexplore.ieee.org/xpls/abs_all.jsp?arnumber=6035709] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1749) Add Boosting algorithm for ensemble learning to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521119#comment-14521119 ] Till Rohrmann commented on FLINK-1749: -- Hi Narayana, great to hear that you want to pick this issue up :-) I assigned the issue to you. You can use this issue to keep track of your progress. > Add Boosting algorithm for ensemble learning to machine learning library > > > Key: FLINK-1749 > URL: https://issues.apache.org/jira/browse/FLINK-1749 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Till Rohrmann > Labels: ML > > Boosting [1] can help to create strong learners from an ensemble of weak > learners and thus improving its performance. Widely used boosting algorithms > are AdaBoost [2] and LogitBoost [3]. The work of I. Palit and C. K. Reddy [4] > investigates how boosting can be efficiently realised in a distributed > setting. > Resources: > [1] [http://en.wikipedia.org/wiki/Boosting_%28machine_learning%29] > [2] [http://en.wikipedia.org/wiki/AdaBoost] > [3] [http://en.wikipedia.org/wiki/LogitBoost] > [4] [http://ieeexplore.ieee.org/xpls/abs_all.jsp?arnumber=6035709] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1749) Add Boosting algorithm for ensemble learning to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521105#comment-14521105 ] narayana reddy commented on FLINK-1749: --- Hi Till Rohrmann, I am pleased to contribute to this flink-1749 issue. Will you please assign me this task. I am master's student from IIIT-Delhi looking for some open source contribution. > Add Boosting algorithm for ensemble learning to machine learning library > > > Key: FLINK-1749 > URL: https://issues.apache.org/jira/browse/FLINK-1749 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Till Rohrmann > Labels: ML > > Boosting [1] can help to create strong learners from an ensemble of weak > learners and thus improving its performance. Widely used boosting algorithms > are AdaBoost [2] and LogitBoost [3]. The work of I. Palit and C. K. Reddy [4] > investigates how boosting can be efficiently realised in a distributed > setting. > Resources: > [1] [http://en.wikipedia.org/wiki/Boosting_%28machine_learning%29] > [2] [http://en.wikipedia.org/wiki/AdaBoost] > [3] [http://en.wikipedia.org/wiki/LogitBoost] > [4] [http://ieeexplore.ieee.org/xpls/abs_all.jsp?arnumber=6035709] -- This message was sent by Atlassian JIRA (v6.3.4#6332)