[GitHub] flink pull request: [FLINK-2371] improve AccumulatorLiveITCase
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/925#issuecomment-123212399 Is it bad that the accumulators are not serialized in a local setup? If not and you only want to test the serialization in a distributed setting, then you can also simply start the testing cluster with multiple `ActorSystems`. This should enforce serialization, if I'm not mistaken. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2371] improve AccumulatorLiveITCase
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/925#issuecomment-123213630 In the master, I only serialize the user-defined accumulators but not the Flink accumulators. As of this pull request, the accumulators are always serialized even in local setups. Starting multiple actor systems did not solve the problem because Akka always optimizes message passing in local one VM setups, i.e. it does not serialize objects even across actor systems. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2371] improve AccumulatorLiveITCase
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/925#discussion_r35079360 --- Diff: flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java --- @@ -109,125 +123,157 @@ public void testProgram() throws Exception { ExecutionEnvironment env = new PlanExtractor(); DataSetString input = env.fromCollection(inputData); input - .flatMap(new Tokenizer()) .flatMap(new WaitingUDF()) .output(new WaitingOutputFormat()); env.execute(); - /** Extract job graph **/ + // Extract job graph and set job id for the task to notify of accumulator changes. JobGraph jobGraph = getOptimizedPlan(((PlanExtractor) env).plan); + jobID = jobGraph.getJobID(); + + // register for accumulator changes + jobManager.tell(new TestingJobManagerMessages.NotifyWhenAccumulatorChange(jobID), getRef()); + expectMsgEquals(true); + // submit job jobManager.tell(new JobManagerMessages.SubmitJob(jobGraph, false), getRef()); expectMsgClass(Status.Success.class); --- End diff -- Same here. Best if you nest all actor interaction in a `within` block. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2371] improve AccumulatorLiveITCase
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/925#discussion_r35079328 --- Diff: flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java --- @@ -109,125 +123,157 @@ public void testProgram() throws Exception { ExecutionEnvironment env = new PlanExtractor(); DataSetString input = env.fromCollection(inputData); input - .flatMap(new Tokenizer()) .flatMap(new WaitingUDF()) .output(new WaitingOutputFormat()); env.execute(); - /** Extract job graph **/ + // Extract job graph and set job id for the task to notify of accumulator changes. JobGraph jobGraph = getOptimizedPlan(((PlanExtractor) env).plan); + jobID = jobGraph.getJobID(); + + // register for accumulator changes + jobManager.tell(new TestingJobManagerMessages.NotifyWhenAccumulatorChange(jobID), getRef()); + expectMsgEquals(true); --- End diff -- When you wait for messages then this should always happen within a `within` block. Otherwise this might never terminate in a failure case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2371] improve AccumulatorLiveITCase
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/925#discussion_r35079637 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -400,20 +400,16 @@ class JobManager( import scala.collection.JavaConverters._ sender ! RegisteredTaskManagers(instanceManager.getAllRegisteredInstances.asScala) -case Heartbeat(instanceID, metricsReport, accumulators) = +case Heartbeat(instanceID, metricsReport, accumulators, asyncAccumulatorUpdate) = log.debug(sReceived hearbeat message from $instanceID.) - Future { -accumulators foreach { - case accumulators = - currentJobs.get(accumulators.getJobID) match { -case Some((jobGraph, jobInfo)) = - jobGraph.updateAccumulators(accumulators) -case None = - // ignore accumulator values for old job - } -} - }(context.dispatcher) + if (asyncAccumulatorUpdate) { --- End diff -- Mixing test code in your production code is IMO bad style. If you need synchronous execution, then start the actor with a `CallingThreadDispatcher` or overwrite this message in the `TestingJobManager`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2371] improve AccumulatorLiveITCase
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/925#discussion_r35086920 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala --- @@ -51,10 +51,13 @@ import scala.concurrent.{ExecutionContext, Future, Await} abstract class FlinkMiniCluster( val userConfiguration: Configuration, val singleActorSystem: Boolean, +val synchronousDispatcher: Boolean, val streamingMode: StreamingMode) { - def this(userConfiguration: Configuration, singleActorSystem: Boolean) - = this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY) + def this(userConfiguration: Configuration, + singleActorSystem: Boolean, + synchronousDispatcher: Boolean) + = this(userConfiguration, singleActorSystem, synchronousDispatcher, StreamingMode.BATCH_ONLY) --- End diff -- I think the field `synchronousDispatcher` should not be part of the `FlinkMiniCluster`. The field seems to be only used for testing, referenced in `TestingCluster`, but the `FlinkMiniCluster` is also used to execute Flink programs locally. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2371] improve AccumulatorLiveITCase
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/925#discussion_r35087953 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala --- @@ -51,10 +51,13 @@ import scala.concurrent.{ExecutionContext, Future, Await} abstract class FlinkMiniCluster( val userConfiguration: Configuration, val singleActorSystem: Boolean, +val synchronousDispatcher: Boolean, val streamingMode: StreamingMode) { - def this(userConfiguration: Configuration, singleActorSystem: Boolean) - = this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY) + def this(userConfiguration: Configuration, + singleActorSystem: Boolean, + synchronousDispatcher: Boolean) + = this(userConfiguration, singleActorSystem, synchronousDispatcher, StreamingMode.BATCH_ONLY) --- End diff -- Good catch. I noticed that as well and already changed that before I saw your answer. Now only the `TestingCluster` is affected by the changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2371] improve AccumulatorLiveITCase
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/925#issuecomment-123283973 Thanks for the feedback. Merging this later on if there are no objections. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2371] improve AccumulatorLiveITCase
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/925#discussion_r35080434 --- Diff: flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java --- @@ -109,125 +123,157 @@ public void testProgram() throws Exception { ExecutionEnvironment env = new PlanExtractor(); DataSetString input = env.fromCollection(inputData); input - .flatMap(new Tokenizer()) .flatMap(new WaitingUDF()) .output(new WaitingOutputFormat()); env.execute(); - /** Extract job graph **/ + // Extract job graph and set job id for the task to notify of accumulator changes. JobGraph jobGraph = getOptimizedPlan(((PlanExtractor) env).plan); + jobID = jobGraph.getJobID(); + + // register for accumulator changes + jobManager.tell(new TestingJobManagerMessages.NotifyWhenAccumulatorChange(jobID), getRef()); + expectMsgEquals(true); --- End diff -- Thanks, I've added a timeout to each call of `expectMsgEquals` and `receive`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2371] improve AccumulatorLiveITCase
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/925#discussion_r35080374 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -400,20 +400,16 @@ class JobManager( import scala.collection.JavaConverters._ sender ! RegisteredTaskManagers(instanceManager.getAllRegisteredInstances.asScala) -case Heartbeat(instanceID, metricsReport, accumulators) = +case Heartbeat(instanceID, metricsReport, accumulators, asyncAccumulatorUpdate) = log.debug(sReceived hearbeat message from $instanceID.) - Future { -accumulators foreach { - case accumulators = - currentJobs.get(accumulators.getJobID) match { -case Some((jobGraph, jobInfo)) = - jobGraph.updateAccumulators(accumulators) -case None = - // ignore accumulator values for old job - } -} - }(context.dispatcher) + if (asyncAccumulatorUpdate) { --- End diff -- Thanks, I was looking for a way to make the future synchronous for my test setup. I will look into `CallingThreadDispatcher`. I'm already overwriting this message in `TestingJobManager` but then I would have to insert duplicate code there :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2371] improve AccumulatorLiveITCase
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/925#discussion_r35081074 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -400,20 +400,16 @@ class JobManager( import scala.collection.JavaConverters._ sender ! RegisteredTaskManagers(instanceManager.getAllRegisteredInstances.asScala) -case Heartbeat(instanceID, metricsReport, accumulators) = +case Heartbeat(instanceID, metricsReport, accumulators, asyncAccumulatorUpdate) = log.debug(sReceived hearbeat message from $instanceID.) - Future { -accumulators foreach { - case accumulators = - currentJobs.get(accumulators.getJobID) match { -case Some((jobGraph, jobInfo)) = - jobGraph.updateAccumulators(accumulators) -case None = - // ignore accumulator values for old job - } -} - }(context.dispatcher) + if (asyncAccumulatorUpdate) { --- End diff -- In the worst case, code duplication in the `TestingJobManager` seems to me the lesser of two evils. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2371] improve AccumulatorLiveITCase
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/925#issuecomment-123243258 I've updated the pull request: - add flag to FlinkMiniCluster to disabled asynchronous execution of futures via the CallingThreadDispatcher - revert changes to Heartbeat message that enabled explicit synchronous update of accumulators - update test to use synchronous execution mode for futures to receive accumulator updates immediately --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2371] improve AccumulatorLiveITCase
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/925#issuecomment-123326290 +1, looks good --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2371] improve AccumulatorLiveITCase
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/925 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2371] improve AccumulatorLiveITCase
GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/925 [FLINK-2371] improve AccumulatorLiveITCase Instead of using Thread.sleep() to synchronize the checks of the accumulator values, we rely on message passing here to synchronize the task process. Therefore, we let the task process signal to the task manager that it has updated its accumulator values. The task manager lets the job manager know and sends out the heartbeat which contains the accumulators. When the job manager receives the accumulators and has been notified previously, it sends a message to the subscribed test case with the current accumulators. This assures that all processes are always synchronized correctly and we can verify the live accumulator results correctly. In the course of rewriting the test, I had to change two things in the implementation: a) User accumulators are now immediately serialized as well. Otherwise, Akka does not serialize in local one VM setups and passes the live accumulator map through. b) The asynchronous update of the accumulators can be disabled for tests. This was necessary because we cannot guarantee when the Future for updating the accumulators is executed. In real setups this is neglectable. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink live-accumulators Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/925.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #925 commit 44687e783065a4157d2d3a695d9e94070ca6e8cd Author: Maximilian Michels m...@apache.org Date: 2015-07-20T09:55:11Z [FLINK-2371] improve AccumulatorLiveITCase Instead of using Thread.sleep() to synchronize the checks of the accumulator values, we rely on message passing here to synchronize the task process. Therefore, we let the task process signal to the task manager that it has updated its accumulator values. The task manager lets the job manager know and sends out the heartbeat which contains the accumulators. When the job manager receives the accumulators and has been notified previously, it sends a message to the subscribed test case with the current accumulators. This assures that all processes are always synchronized correctly and we can verify the live accumulator results correctly. In the course of rewriting the test, I had to change two things in the implementation: a) User accumulators are now immediately serialized as well. Otherwise, Akka does not serialize in local one VM setups and passes the live accumulator map through. b) The asynchronous update of the accumulators can be disabled for tests. This was necessary because we cannot guarantee when the Future for updating the accumulators is executed. In real setups this is neglectable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---