[GitHub] flink pull request: [FLINK-2371] improve AccumulatorLiveITCase

2015-07-21 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/925#issuecomment-123212399
  
Is it bad that the accumulators are not serialized in a local setup? If not 
and you only want to test the serialization in a distributed setting, then you 
can also simply start the testing cluster with multiple `ActorSystems`. This 
should enforce serialization, if I'm not mistaken.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2371] improve AccumulatorLiveITCase

2015-07-21 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/925#issuecomment-123213630
  
In the master, I only serialize the user-defined accumulators but not the 
Flink accumulators. As of this pull request, the accumulators are always 
serialized even in local setups. Starting multiple actor systems did not solve 
the problem because Akka always optimizes message passing in local one VM 
setups, i.e. it does not serialize objects even across actor systems.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2371] improve AccumulatorLiveITCase

2015-07-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/925#discussion_r35079360
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
 ---
@@ -109,125 +123,157 @@ public void testProgram() throws Exception {
ExecutionEnvironment env = new PlanExtractor();
DataSetString input = env.fromCollection(inputData);
input
-   .flatMap(new Tokenizer())
.flatMap(new WaitingUDF())
.output(new WaitingOutputFormat());
env.execute();
 
-   /** Extract job graph **/
+   // Extract job graph and set job id for the task to 
notify of accumulator changes.
JobGraph jobGraph = getOptimizedPlan(((PlanExtractor) 
env).plan);
+   jobID = jobGraph.getJobID();
+
+   // register for accumulator changes
+   jobManager.tell(new 
TestingJobManagerMessages.NotifyWhenAccumulatorChange(jobID), getRef());
+   expectMsgEquals(true);
 
+   // submit job
jobManager.tell(new 
JobManagerMessages.SubmitJob(jobGraph, false), getRef());
expectMsgClass(Status.Success.class);
--- End diff --

Same here. Best if you nest all actor interaction in a `within` block.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2371] improve AccumulatorLiveITCase

2015-07-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/925#discussion_r35079328
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
 ---
@@ -109,125 +123,157 @@ public void testProgram() throws Exception {
ExecutionEnvironment env = new PlanExtractor();
DataSetString input = env.fromCollection(inputData);
input
-   .flatMap(new Tokenizer())
.flatMap(new WaitingUDF())
.output(new WaitingOutputFormat());
env.execute();
 
-   /** Extract job graph **/
+   // Extract job graph and set job id for the task to 
notify of accumulator changes.
JobGraph jobGraph = getOptimizedPlan(((PlanExtractor) 
env).plan);
+   jobID = jobGraph.getJobID();
+
+   // register for accumulator changes
+   jobManager.tell(new 
TestingJobManagerMessages.NotifyWhenAccumulatorChange(jobID), getRef());
+   expectMsgEquals(true);
--- End diff --

When you wait for messages then this should always happen within a `within` 
block. Otherwise this might never terminate in a failure case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2371] improve AccumulatorLiveITCase

2015-07-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/925#discussion_r35079637
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -400,20 +400,16 @@ class JobManager(
   import scala.collection.JavaConverters._
   sender ! 
RegisteredTaskManagers(instanceManager.getAllRegisteredInstances.asScala)
 
-case Heartbeat(instanceID, metricsReport, accumulators) =
+case Heartbeat(instanceID, metricsReport, accumulators, 
asyncAccumulatorUpdate) =
   log.debug(sReceived hearbeat message from $instanceID.)
 
-  Future {
-accumulators foreach {
-  case accumulators =
-  currentJobs.get(accumulators.getJobID) match {
-case Some((jobGraph, jobInfo)) =
-  jobGraph.updateAccumulators(accumulators)
-case None =
-  // ignore accumulator values for old job
-  }
-}
-  }(context.dispatcher)
+  if (asyncAccumulatorUpdate) {
--- End diff --

Mixing test code in your production code is IMO bad style. If you need 
synchronous execution, then start the actor with a `CallingThreadDispatcher` or 
overwrite this message in the `TestingJobManager`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2371] improve AccumulatorLiveITCase

2015-07-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/925#discussion_r35086920
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 ---
@@ -51,10 +51,13 @@ import scala.concurrent.{ExecutionContext, Future, 
Await}
 abstract class FlinkMiniCluster(
 val userConfiguration: Configuration,
 val singleActorSystem: Boolean,
+val synchronousDispatcher: Boolean,
 val streamingMode: StreamingMode) {
 
-  def this(userConfiguration: Configuration, singleActorSystem: Boolean) 
- = this(userConfiguration, singleActorSystem, 
StreamingMode.BATCH_ONLY)
+  def this(userConfiguration: Configuration,
+   singleActorSystem: Boolean,
+   synchronousDispatcher: Boolean)
+   = this(userConfiguration, singleActorSystem, synchronousDispatcher, 
StreamingMode.BATCH_ONLY)
--- End diff --

I think the field `synchronousDispatcher` should not be part of the 
`FlinkMiniCluster`. The field seems to be only used for testing, referenced in 
`TestingCluster`, but the `FlinkMiniCluster` is also used to execute Flink 
programs locally.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2371] improve AccumulatorLiveITCase

2015-07-21 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/925#discussion_r35087953
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 ---
@@ -51,10 +51,13 @@ import scala.concurrent.{ExecutionContext, Future, 
Await}
 abstract class FlinkMiniCluster(
 val userConfiguration: Configuration,
 val singleActorSystem: Boolean,
+val synchronousDispatcher: Boolean,
 val streamingMode: StreamingMode) {
 
-  def this(userConfiguration: Configuration, singleActorSystem: Boolean) 
- = this(userConfiguration, singleActorSystem, 
StreamingMode.BATCH_ONLY)
+  def this(userConfiguration: Configuration,
+   singleActorSystem: Boolean,
+   synchronousDispatcher: Boolean)
+   = this(userConfiguration, singleActorSystem, synchronousDispatcher, 
StreamingMode.BATCH_ONLY)
--- End diff --

Good catch. I noticed that as well and already changed that before I saw 
your answer. Now only the `TestingCluster` is affected by the changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2371] improve AccumulatorLiveITCase

2015-07-21 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/925#issuecomment-123283973
  
Thanks for the feedback. Merging this later on if there are no objections.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2371] improve AccumulatorLiveITCase

2015-07-21 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/925#discussion_r35080434
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
 ---
@@ -109,125 +123,157 @@ public void testProgram() throws Exception {
ExecutionEnvironment env = new PlanExtractor();
DataSetString input = env.fromCollection(inputData);
input
-   .flatMap(new Tokenizer())
.flatMap(new WaitingUDF())
.output(new WaitingOutputFormat());
env.execute();
 
-   /** Extract job graph **/
+   // Extract job graph and set job id for the task to 
notify of accumulator changes.
JobGraph jobGraph = getOptimizedPlan(((PlanExtractor) 
env).plan);
+   jobID = jobGraph.getJobID();
+
+   // register for accumulator changes
+   jobManager.tell(new 
TestingJobManagerMessages.NotifyWhenAccumulatorChange(jobID), getRef());
+   expectMsgEquals(true);
--- End diff --

Thanks, I've added a timeout to each call of `expectMsgEquals` and 
`receive`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2371] improve AccumulatorLiveITCase

2015-07-21 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/925#discussion_r35080374
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -400,20 +400,16 @@ class JobManager(
   import scala.collection.JavaConverters._
   sender ! 
RegisteredTaskManagers(instanceManager.getAllRegisteredInstances.asScala)
 
-case Heartbeat(instanceID, metricsReport, accumulators) =
+case Heartbeat(instanceID, metricsReport, accumulators, 
asyncAccumulatorUpdate) =
   log.debug(sReceived hearbeat message from $instanceID.)
 
-  Future {
-accumulators foreach {
-  case accumulators =
-  currentJobs.get(accumulators.getJobID) match {
-case Some((jobGraph, jobInfo)) =
-  jobGraph.updateAccumulators(accumulators)
-case None =
-  // ignore accumulator values for old job
-  }
-}
-  }(context.dispatcher)
+  if (asyncAccumulatorUpdate) {
--- End diff --

Thanks, I was looking for a way to make the future synchronous for my test 
setup. I will look into `CallingThreadDispatcher`. I'm already overwriting this 
message in `TestingJobManager` but then I would have to insert duplicate code 
there :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2371] improve AccumulatorLiveITCase

2015-07-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/925#discussion_r35081074
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -400,20 +400,16 @@ class JobManager(
   import scala.collection.JavaConverters._
   sender ! 
RegisteredTaskManagers(instanceManager.getAllRegisteredInstances.asScala)
 
-case Heartbeat(instanceID, metricsReport, accumulators) =
+case Heartbeat(instanceID, metricsReport, accumulators, 
asyncAccumulatorUpdate) =
   log.debug(sReceived hearbeat message from $instanceID.)
 
-  Future {
-accumulators foreach {
-  case accumulators =
-  currentJobs.get(accumulators.getJobID) match {
-case Some((jobGraph, jobInfo)) =
-  jobGraph.updateAccumulators(accumulators)
-case None =
-  // ignore accumulator values for old job
-  }
-}
-  }(context.dispatcher)
+  if (asyncAccumulatorUpdate) {
--- End diff --

In the worst case, code duplication in the `TestingJobManager` seems to me 
the lesser of two evils.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2371] improve AccumulatorLiveITCase

2015-07-21 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/925#issuecomment-123243258
  
I've updated the pull request:

- add flag to FlinkMiniCluster to disabled asynchronous execution of 
futures via the CallingThreadDispatcher
- revert changes to Heartbeat message that enabled explicit synchronous 
update of accumulators
- update test to use synchronous execution mode for futures to receive 
accumulator updates immediately


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2371] improve AccumulatorLiveITCase

2015-07-21 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/925#issuecomment-123326290
  
+1, looks good


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2371] improve AccumulatorLiveITCase

2015-07-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/925


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2371] improve AccumulatorLiveITCase

2015-07-20 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/925

[FLINK-2371] improve AccumulatorLiveITCase

Instead of using Thread.sleep() to synchronize the checks of the
accumulator values, we rely on message passing here to synchronize the
task process.

Therefore, we let the task process signal to the task manager that it
has updated its accumulator values. The task manager lets the job
manager know and sends out the heartbeat which contains the
accumulators. When the job manager receives the accumulators and has
been notified previously, it sends a message to the subscribed test case
with the current accumulators.

This assures that all processes are always synchronized correctly and we
can verify the live accumulator results correctly.

In the course of rewriting the test, I had to change two things in the
implementation:

a) User accumulators are now immediately serialized as well. Otherwise,
Akka does not serialize in local one VM setups and passes the live
accumulator map through.

b) The asynchronous update of the accumulators can be disabled for
tests. This was necessary because we cannot guarantee when the Future
for updating the accumulators is executed. In real setups this is
neglectable.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink live-accumulators

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/925.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #925


commit 44687e783065a4157d2d3a695d9e94070ca6e8cd
Author: Maximilian Michels m...@apache.org
Date:   2015-07-20T09:55:11Z

[FLINK-2371] improve AccumulatorLiveITCase

Instead of using Thread.sleep() to synchronize the checks of the
accumulator values, we rely on message passing here to synchronize the
task process.

Therefore, we let the task process signal to the task manager that it
has updated its accumulator values. The task manager lets the job
manager know and sends out the heartbeat which contains the
accumulators. When the job manager receives the accumulators and has
been notified previously, it sends a message to the subscribed test case
with the current accumulators.

This assures that all processes are always synchronized correctly and we
can verify the live accumulator results correctly.

In the course of rewriting the test, I had to change two things in the
implementation:

a) User accumulators are now immediately serialized as well. Otherwise,
Akka does not serialize in local one VM setups and passes the live
accumulator map through.

b) The asynchronous update of the accumulators can be disabled for
tests. This was necessary because we cannot guarantee when the Future
for updating the accumulators is executed. In real setups this is
neglectable.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---