[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-15 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34662511
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -306,99 +308,100 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
 if (!isConnected) {
   log.debug(sDropping message $message because the TaskManager is 
currently  +
 not connected to a JobManager.)
-}
+} else {
 
-// we order the messages by frequency, to make sure the code paths for 
matching
-// are as short as possible
-message match {
+  // we order the messages by frequency, to make sure the code paths 
for matching
+  // are as short as possible
+  message match {
+
+// tell the task about the availability of a new input partition
+case UpdateTaskSinglePartitionInfo(executionID, resultID, 
partitionInfo) =
+  updateTaskInputPartitions(executionID, List((resultID, 
partitionInfo)))
+
+// tell the task about the availability of some new input 
partitions
+case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) 
=
+  updateTaskInputPartitions(executionID, partitionInfos)
+
+// discards intermediate result partitions of a task execution on 
this TaskManager
+case FailIntermediateResultPartitions(executionID) =
+  log.info(Discarding the results produced by task execution  + 
executionID)
+  if (network.isAssociated) {
+try {
+  
network.getPartitionManager.releasePartitionsProducedBy(executionID)
+} catch {
+  case t: Throwable = killTaskManagerFatal(
+Fatal leak: Unable to release intermediate result 
partition data, t)
+}
+  }
 
-  // tell the task about the availability of a new input partition
-  case UpdateTaskSinglePartitionInfo(executionID, resultID, 
partitionInfo) =
-updateTaskInputPartitions(executionID, List((resultID, 
partitionInfo)))
+// notifies the TaskManager that the state of a task has changed.
+// the TaskManager informs the JobManager and cleans up in case 
the transition
+// was into a terminal state, or in case the JobManager cannot be 
informed of the
+// state transition
 
-  // tell the task about the availability of some new input partitions
-  case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) =
-updateTaskInputPartitions(executionID, partitionInfos)
+case updateMsg@UpdateTaskExecutionState(taskExecutionState: 
TaskExecutionState) =
 
-  // discards intermediate result partitions of a task execution on 
this TaskManager
-  case FailIntermediateResultPartitions(executionID) =
-log.info(Discarding the results produced by task execution  + 
executionID)
-if (network.isAssociated) {
-  try {
-
network.getPartitionManager.releasePartitionsProducedBy(executionID)
-  } catch {
-case t: Throwable = killTaskManagerFatal(
-Fatal leak: Unable to release intermediate result 
partition data, t)
-  }
-}
+  // we receive these from our tasks and forward them to the 
JobManager
--- End diff --

Are we sure nothing changed there in the logic? We have spent so much time 
getting this delicate thing right, I am always extra careful when seeing 
changes there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-15 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34662919
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -306,99 +308,100 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
 if (!isConnected) {
   log.debug(sDropping message $message because the TaskManager is 
currently  +
 not connected to a JobManager.)
-}
+} else {
 
-// we order the messages by frequency, to make sure the code paths for 
matching
-// are as short as possible
-message match {
+  // we order the messages by frequency, to make sure the code paths 
for matching
+  // are as short as possible
+  message match {
+
+// tell the task about the availability of a new input partition
+case UpdateTaskSinglePartitionInfo(executionID, resultID, 
partitionInfo) =
+  updateTaskInputPartitions(executionID, List((resultID, 
partitionInfo)))
+
+// tell the task about the availability of some new input 
partitions
+case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) 
=
+  updateTaskInputPartitions(executionID, partitionInfos)
+
+// discards intermediate result partitions of a task execution on 
this TaskManager
+case FailIntermediateResultPartitions(executionID) =
+  log.info(Discarding the results produced by task execution  + 
executionID)
+  if (network.isAssociated) {
+try {
+  
network.getPartitionManager.releasePartitionsProducedBy(executionID)
+} catch {
+  case t: Throwable = killTaskManagerFatal(
+Fatal leak: Unable to release intermediate result 
partition data, t)
+}
+  }
 
-  // tell the task about the availability of a new input partition
-  case UpdateTaskSinglePartitionInfo(executionID, resultID, 
partitionInfo) =
-updateTaskInputPartitions(executionID, List((resultID, 
partitionInfo)))
+// notifies the TaskManager that the state of a task has changed.
+// the TaskManager informs the JobManager and cleans up in case 
the transition
+// was into a terminal state, or in case the JobManager cannot be 
informed of the
+// state transition
 
-  // tell the task about the availability of some new input partitions
-  case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) =
-updateTaskInputPartitions(executionID, partitionInfos)
+case updateMsg@UpdateTaskExecutionState(taskExecutionState: 
TaskExecutionState) =
 
-  // discards intermediate result partitions of a task execution on 
this TaskManager
-  case FailIntermediateResultPartitions(executionID) =
-log.info(Discarding the results produced by task execution  + 
executionID)
-if (network.isAssociated) {
-  try {
-
network.getPartitionManager.releasePartitionsProducedBy(executionID)
-  } catch {
-case t: Throwable = killTaskManagerFatal(
-Fatal leak: Unable to release intermediate result 
partition data, t)
-  }
-}
+  // we receive these from our tasks and forward them to the 
JobManager
--- End diff --

I'm absolutely sure but I understand your worries. I'll exclude it from 
this pull request and open a separate pull request.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-15 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34663165
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -306,99 +308,100 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
 if (!isConnected) {
   log.debug(sDropping message $message because the TaskManager is 
currently  +
 not connected to a JobManager.)
-}
+} else {
 
-// we order the messages by frequency, to make sure the code paths for 
matching
-// are as short as possible
-message match {
+  // we order the messages by frequency, to make sure the code paths 
for matching
+  // are as short as possible
+  message match {
+
+// tell the task about the availability of a new input partition
+case UpdateTaskSinglePartitionInfo(executionID, resultID, 
partitionInfo) =
+  updateTaskInputPartitions(executionID, List((resultID, 
partitionInfo)))
+
+// tell the task about the availability of some new input 
partitions
+case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) 
=
+  updateTaskInputPartitions(executionID, partitionInfos)
+
+// discards intermediate result partitions of a task execution on 
this TaskManager
+case FailIntermediateResultPartitions(executionID) =
+  log.info(Discarding the results produced by task execution  + 
executionID)
+  if (network.isAssociated) {
+try {
+  
network.getPartitionManager.releasePartitionsProducedBy(executionID)
+} catch {
+  case t: Throwable = killTaskManagerFatal(
+Fatal leak: Unable to release intermediate result 
partition data, t)
+}
+  }
 
-  // tell the task about the availability of a new input partition
-  case UpdateTaskSinglePartitionInfo(executionID, resultID, 
partitionInfo) =
-updateTaskInputPartitions(executionID, List((resultID, 
partitionInfo)))
+// notifies the TaskManager that the state of a task has changed.
+// the TaskManager informs the JobManager and cleans up in case 
the transition
+// was into a terminal state, or in case the JobManager cannot be 
informed of the
+// state transition
 
-  // tell the task about the availability of some new input partitions
-  case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) =
-updateTaskInputPartitions(executionID, partitionInfos)
+case updateMsg@UpdateTaskExecutionState(taskExecutionState: 
TaskExecutionState) =
 
-  // discards intermediate result partitions of a task execution on 
this TaskManager
-  case FailIntermediateResultPartitions(executionID) =
-log.info(Discarding the results produced by task execution  + 
executionID)
-if (network.isAssociated) {
-  try {
-
network.getPartitionManager.releasePartitionsProducedBy(executionID)
-  } catch {
-case t: Throwable = killTaskManagerFatal(
-Fatal leak: Unable to release intermediate result 
partition data, t)
-  }
-}
+  // we receive these from our tasks and forward them to the 
JobManager
--- End diff --

If you are absolutely sure, it is fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-15 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34662347
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java
 ---
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.accumulators;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Main accumulator registry which encapsulates internal and user-defined 
accumulators.
+ */
+public class AccumulatorRegistry {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(AccumulatorRegistry.class);
+
+   protected final JobID jobID;
+   protected final ExecutionAttemptID taskID;
+
+   /* Flink's internal Accumulator values stored for the executing task. */
+   private final MapMetric, Accumulator?, ? flinkAccumulators =
+   new HashMapMetric, Accumulator?, ?();
+
+   /* User-defined Accumulator values stored for the executing task. */
+   private final MapString, Accumulator?, ? userAccumulators =
+   Collections.synchronizedMap(new HashMapString, 
Accumulator?, ?());
+
+   /* The reporter returned to reporting tasks. */
+   private final ReadWriteReporter reporter;
+
+   /**
+* Flink metrics supported
+*/
+   public enum Metric {
+   NUM_RECORDS_IN,
+   NUM_RECORDS_OUT,
+   NUM_BYTES_IN,
+   NUM_BYTES_OUT
+   }
+
+
+   public AccumulatorRegistry(JobID jobID, ExecutionAttemptID taskID) {
+   this.jobID = jobID;
+   this.taskID = taskID;
+   this.reporter = new ReadWriteReporter(flinkAccumulators);
+   }
+
+   /**
+* Creates a snapshot of this accumulator registry.
+* @return a serialized accumulator map
+*/
+   public AccumulatorSnapshot getSnapshot() {
+   try {
+   return new AccumulatorSnapshot(jobID, taskID, 
flinkAccumulators, userAccumulators);
+   } catch (IOException e) {
+   LOG.warn(Failed to serialize accumulators for task., 
e);
+   return null;
+   }
+   }
+
+   /**
+* Gets the map for user-defined accumulators.
+*/
+   public MapString, Accumulator?, ? getUserMap() {
+   return userAccumulators;
+   }
+
+   /**
+* Gets the reporter for flink internal metrics.
+*/
+   public Reporter getReadWriteReporter() {
+   return reporter;
+   }
+
+   /**
+* Interface for Flink's internal accumulators.
+*/
+   public interface Reporter {
+   void report(Metric metric, long value);
+   }
+
+   /**
+* Accumulator based reporter for keeping track of internal metrics 
(e.g. bytes and records in/out)
+*/
+   public static class ReadWriteReporter implements Reporter {
+
+   private LongCounter numRecordsIn = new LongCounter();
+   private LongCounter numRecordsOut = new LongCounter();
+   private LongCounter numBytesIn = new LongCounter();
+   private LongCounter numBytesOut = new LongCounter();
+
+   public ReadWriteReporter(MapMetric, Accumulator?,? 
accumulatorMap) {
+   accumulatorMap.put(Metric.NUM_RECORDS_IN, numRecordsIn);
+   accumulatorMap.put(Metric.NUM_RECORDS_OUT, 
numRecordsOut);
+   accumulatorMap.put(Metric.NUM_BYTES_IN, numBytesIn);
+   

[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-15 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34665546
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -306,99 +308,100 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
 if (!isConnected) {
   log.debug(sDropping message $message because the TaskManager is 
currently  +
 not connected to a JobManager.)
-}
+} else {
 
-// we order the messages by frequency, to make sure the code paths for 
matching
-// are as short as possible
-message match {
+  // we order the messages by frequency, to make sure the code paths 
for matching
+  // are as short as possible
+  message match {
+
+// tell the task about the availability of a new input partition
+case UpdateTaskSinglePartitionInfo(executionID, resultID, 
partitionInfo) =
+  updateTaskInputPartitions(executionID, List((resultID, 
partitionInfo)))
+
+// tell the task about the availability of some new input 
partitions
+case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) 
=
+  updateTaskInputPartitions(executionID, partitionInfos)
+
+// discards intermediate result partitions of a task execution on 
this TaskManager
+case FailIntermediateResultPartitions(executionID) =
+  log.info(Discarding the results produced by task execution  + 
executionID)
+  if (network.isAssociated) {
+try {
+  
network.getPartitionManager.releasePartitionsProducedBy(executionID)
+} catch {
+  case t: Throwable = killTaskManagerFatal(
+Fatal leak: Unable to release intermediate result 
partition data, t)
+}
+  }
 
-  // tell the task about the availability of a new input partition
-  case UpdateTaskSinglePartitionInfo(executionID, resultID, 
partitionInfo) =
-updateTaskInputPartitions(executionID, List((resultID, 
partitionInfo)))
+// notifies the TaskManager that the state of a task has changed.
+// the TaskManager informs the JobManager and cleans up in case 
the transition
+// was into a terminal state, or in case the JobManager cannot be 
informed of the
+// state transition
 
-  // tell the task about the availability of some new input partitions
-  case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) =
-updateTaskInputPartitions(executionID, partitionInfos)
+case updateMsg@UpdateTaskExecutionState(taskExecutionState: 
TaskExecutionState) =
 
-  // discards intermediate result partitions of a task execution on 
this TaskManager
-  case FailIntermediateResultPartitions(executionID) =
-log.info(Discarding the results produced by task execution  + 
executionID)
-if (network.isAssociated) {
-  try {
-
network.getPartitionManager.releasePartitionsProducedBy(executionID)
-  } catch {
-case t: Throwable = killTaskManagerFatal(
-Fatal leak: Unable to release intermediate result 
partition data, t)
-  }
-}
+  // we receive these from our tasks and forward them to the 
JobManager
--- End diff --

I've opened a separate pull request #914. We can discuss there whether the 
change is necessary or not.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-15 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34674005
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java
 ---
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.accumulators;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Main accumulator registry which encapsulates internal and user-defined 
accumulators.
+ */
+public class AccumulatorRegistry {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(AccumulatorRegistry.class);
+
+   protected final JobID jobID;
+   protected final ExecutionAttemptID taskID;
+
+   /* Flink's internal Accumulator values stored for the executing task. */
+   private final MapMetric, Accumulator?, ? flinkAccumulators =
+   new HashMapMetric, Accumulator?, ?();
+
+   /* User-defined Accumulator values stored for the executing task. */
+   private final MapString, Accumulator?, ? userAccumulators =
+   Collections.synchronizedMap(new HashMapString, 
Accumulator?, ?());
+
+   /* The reporter returned to reporting tasks. */
+   private final ReadWriteReporter reporter;
+
+   /**
+* Flink metrics supported
+*/
+   public enum Metric {
+   NUM_RECORDS_IN,
+   NUM_RECORDS_OUT,
+   NUM_BYTES_IN,
+   NUM_BYTES_OUT
+   }
+
+
+   public AccumulatorRegistry(JobID jobID, ExecutionAttemptID taskID) {
+   this.jobID = jobID;
+   this.taskID = taskID;
+   this.reporter = new ReadWriteReporter(flinkAccumulators);
+   }
+
+   /**
+* Creates a snapshot of this accumulator registry.
+* @return a serialized accumulator map
+*/
+   public AccumulatorSnapshot getSnapshot() {
+   try {
+   return new AccumulatorSnapshot(jobID, taskID, 
flinkAccumulators, userAccumulators);
+   } catch (IOException e) {
+   LOG.warn(Failed to serialize accumulators for task., 
e);
+   return null;
+   }
+   }
+
+   /**
+* Gets the map for user-defined accumulators.
+*/
+   public MapString, Accumulator?, ? getUserMap() {
+   return userAccumulators;
+   }
+
+   /**
+* Gets the reporter for flink internal metrics.
+*/
+   public Reporter getReadWriteReporter() {
+   return reporter;
+   }
+
+   /**
+* Interface for Flink's internal accumulators.
+*/
+   public interface Reporter {
+   void report(Metric metric, long value);
+   }
+
+   /**
+* Accumulator based reporter for keeping track of internal metrics 
(e.g. bytes and records in/out)
+*/
+   public static class ReadWriteReporter implements Reporter {
+
+   private LongCounter numRecordsIn = new LongCounter();
+   private LongCounter numRecordsOut = new LongCounter();
+   private LongCounter numBytesIn = new LongCounter();
+   private LongCounter numBytesOut = new LongCounter();
+
+   public ReadWriteReporter(MapMetric, Accumulator?,? 
accumulatorMap) {
+   accumulatorMap.put(Metric.NUM_RECORDS_IN, numRecordsIn);
+   accumulatorMap.put(Metric.NUM_RECORDS_OUT, 
numRecordsOut);
+   accumulatorMap.put(Metric.NUM_BYTES_IN, numBytesIn);
+   

[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/896


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-15 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/896#issuecomment-121605629
  
Thanks for the feedback @StephanEwen @uce. Will merge this once Travis 
passes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-14 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/896#issuecomment-121173161
  
 Wasn't the rationale to have a separate message for the heartbeat and the 
accumulators to keep the heartbeat messages small? What do you think? In any 
case, I agree that it makes sense to be able to configure this.

Initially, that was my proposal but I remember we discussed that we should 
try to minimize the amount of messages and thus use the `Heartbeat` message. It 
makes sense to be able to configure the snapshot transferal in multiples of the 
hearbeat interval. I'd like to fix that in another pull 

 I would add a unit test for the record reader/writer accumulator results.

I'll integrate that into the `AccumulatorLiveITCase`.

 There seems to be a 1:1 correspondence between the reporter and the 
internal metrics. Is the idea to have multiple different reporters in the 
future (for different types of metrics etc.) or is a long reporter sufficient?

For now, a long reporter is sufficient but if we discover that we need more 
customizable reporters, we can easily change the reporter interface in the 
future (it wouldn't be API breaking).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-14 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34547975
  
--- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
---
@@ -408,14 +408,16 @@ public long count() throws Exception {
JobExecutionResult res = getExecutionEnvironment().execute();
 
ArrayListbyte[] accResult = res.getAccumulatorResult(id);
-   try {
-   return 
SerializedListAccumulator.deserializeList(accResult, serializer);
-   }
-   catch (ClassNotFoundException e) {
-   throw new RuntimeException(Cannot find type class of 
collected data type., e);
-   }
-   catch (IOException e) {
-   throw new RuntimeException(Serialization error while 
deserializing collected data, e);
+   if (accResult != null) {
+   try {
+   return 
SerializedListAccumulator.deserializeList(accResult, serializer);
+   } catch (ClassNotFoundException e) {
+   throw new RuntimeException(Cannot find type 
class of collected data type., e);
+   } catch (IOException e) {
+   throw new RuntimeException(Serialization error 
while deserializing collected data, e);
+   }
+   } else {
+   throw new RuntimeException(The job result did not 
contain any accumulator data.);
--- End diff --

Then let's change it to
 Collect returned no results.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-14 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34549555
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -400,13 +398,22 @@ class JobManager(protected val flinkConfiguration: 
Configuration,
   import scala.collection.JavaConverters._
   sender ! 
RegisteredTaskManagers(instanceManager.getAllRegisteredInstances.asScala)
 
-case Heartbeat(instanceID, metricsReport) =
-  try {
-log.debug(sReceived hearbeat message from $instanceID.)
-instanceManager.reportHeartBeat(instanceID, metricsReport)
-  } catch {
-case t: Throwable = log.error(sCould not report heart beat from 
${sender().path}., t)
-  }
+case Heartbeat(instanceID, metricsReport, accumulators) =
+  log.debug(sReceived hearbeat message from $instanceID.)
+
+  Future {
+accumulators foreach {
+  case accumulators =
+  currentJobs.get(accumulators.getJobID) match {
--- End diff --

Yes that is true. Like you said such corner case wouldn't cause anything 
bad but it would log an error. So let's just do nothing in case of an unknown 
job id.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-14 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34552275
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
 ---
@@ -101,9 +101,9 @@
 
/**
 * For system internal usage only. Use getAccumulator(...) to obtain a
-* accumulator. Use this as read-only.
+* accumulator. The returned map must NOT be modified.
 */
-   HashMapString, Accumulator?, ? getAllAccumulators();
+   MapString, Accumulator?, ? getAllAccumulators();
--- End diff --

Changed the comment and deprecated the method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-14 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34547621
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
 ---
@@ -53,32 +53,34 @@
 
private final ExecutionConfig executionConfig;
 
-   private final HashMapString, Accumulator?, ? accumulators = new 
HashMapString, Accumulator?, ?();
-   
+   private final MapString, Accumulator?, ? accumulators;
+
private final DistributedCache distributedCache;
-   
-   
+
+
public AbstractRuntimeUDFContext(String name,
--- End diff --

Thanks. Will make the cpTasks the last argument.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-14 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34547538
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
 ---
@@ -101,9 +101,9 @@
 
/**
 * For system internal usage only. Use getAccumulator(...) to obtain a
-* accumulator. Use this as read-only.
+* accumulator. The returned map must NOT be modified.
 */
-   HashMapString, Accumulator?, ? getAllAccumulators();
+   MapString, Accumulator?, ? getAllAccumulators();
--- End diff --

No, this is a user-facing to get the Accumulator map in the RichFunction. I 
changed the return type to Map and I'm returning a Collections.immutableMap to 
prevent modifications. I think it makes sense that the user can get a list of 
registered accumulators and their current status (e.g. in chained operators).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-14 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34548050
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java
 ---
@@ -18,32 +18,60 @@
 
 package org.apache.flink.runtime.accumulators;
 
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.util.SerializedValue;
 
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
 /**
- * This class encapsulates a map of accumulators for a single job. It is 
used
+ * This class encapsulates a map of accumulators for a single task. It is 
used
  * for the transfer from TaskManagers to the JobManager and from the 
JobManager
  * to the Client.
  */
-public class AccumulatorEvent extends SerializedValueMapString, 
Accumulator?, ? {
+public class AccumulatorEvent implements Serializable {
 
-   private static final long serialVersionUID = 8965894516006882735L;
+   private static final long serialVersionUID = 42L;
 
-   /** JobID for the target job */
private final JobID jobID;
+   private final ExecutionAttemptID executionAttemptID;
 
+   private final MapAccumulatorRegistry.Metric, Accumulator?, ? 
flinkAccumulators;
+   private final SerializedValueMapString, Accumulator?, ? 
userAccumulators;
 
-   public AccumulatorEvent(JobID jobID, MapString, Accumulator?, ? 
accumulators) throws IOException {
-   super(accumulators);
+   public AccumulatorEvent(JobID jobID, ExecutionAttemptID 
executionAttemptID,
--- End diff --

The class is used to transfer the accumulator results from the task manager 
to the job manager. So it is used in both runtime components. I agree we could 
give it a better name. I like `AccumulatorSnapshot`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-13 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34504410
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -400,13 +398,22 @@ class JobManager(protected val flinkConfiguration: 
Configuration,
   import scala.collection.JavaConverters._
   sender ! 
RegisteredTaskManagers(instanceManager.getAllRegisteredInstances.asScala)
 
-case Heartbeat(instanceID, metricsReport) =
-  try {
-log.debug(sReceived hearbeat message from $instanceID.)
-instanceManager.reportHeartBeat(instanceID, metricsReport)
-  } catch {
-case t: Throwable = log.error(sCould not report heart beat from 
${sender().path}., t)
-  }
+case Heartbeat(instanceID, metricsReport, accumulators) =
+  log.debug(sReceived hearbeat message from $instanceID.)
+
+  Future {
+accumulators foreach {
+  case accumulators =
+  currentJobs.get(accumulators.getJobID) match {
--- End diff --

The map of current jobs is not thread-safe. I think there could be corner 
cases where an error is reported although nothing wrong happend.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-13 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34503354
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
 ---
@@ -101,9 +101,9 @@
 
/**
 * For system internal usage only. Use getAccumulator(...) to obtain a
-* accumulator. Use this as read-only.
+* accumulator. The returned map must NOT be modified.
 */
-   HashMapString, Accumulator?, ? getAllAccumulators();
+   MapString, Accumulator?, ? getAllAccumulators();
--- End diff --

The comment says system-internal usage only, but I couldn't find any usages 
with IntelliJ. Should we remove this then? I guess that this has been replaced 
by the getSnapshot method of the registry.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-13 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34504087
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java
 ---
@@ -18,32 +18,60 @@
 
 package org.apache.flink.runtime.accumulators;
 
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.util.SerializedValue;
 
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
 /**
- * This class encapsulates a map of accumulators for a single job. It is 
used
+ * This class encapsulates a map of accumulators for a single task. It is 
used
  * for the transfer from TaskManagers to the JobManager and from the 
JobManager
  * to the Client.
  */
-public class AccumulatorEvent extends SerializedValueMapString, 
Accumulator?, ? {
+public class AccumulatorEvent implements Serializable {
 
-   private static final long serialVersionUID = 8965894516006882735L;
+   private static final long serialVersionUID = 42L;
 
-   /** JobID for the target job */
private final JobID jobID;
+   private final ExecutionAttemptID executionAttemptID;
 
+   private final MapAccumulatorRegistry.Metric, Accumulator?, ? 
flinkAccumulators;
+   private final SerializedValueMapString, Accumulator?, ? 
userAccumulators;
 
-   public AccumulatorEvent(JobID jobID, MapString, Accumulator?, ? 
accumulators) throws IOException {
-   super(accumulators);
+   public AccumulatorEvent(JobID jobID, ExecutionAttemptID 
executionAttemptID,
--- End diff --

Regarding the name of this class: I think the name implies a single 
entity and not a pair of maps etc. Maybe rename it to something along the 
lines of AccumulatorSnapshot? getSnapshot is the only method where this is used 
anyways, no?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-13 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34503752
  
--- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
---
@@ -408,14 +408,16 @@ public long count() throws Exception {
JobExecutionResult res = getExecutionEnvironment().execute();
 
ArrayListbyte[] accResult = res.getAccumulatorResult(id);
-   try {
-   return 
SerializedListAccumulator.deserializeList(accResult, serializer);
-   }
-   catch (ClassNotFoundException e) {
-   throw new RuntimeException(Cannot find type class of 
collected data type., e);
-   }
-   catch (IOException e) {
-   throw new RuntimeException(Serialization error while 
deserializing collected data, e);
+   if (accResult != null) {
+   try {
+   return 
SerializedListAccumulator.deserializeList(accResult, serializer);
+   } catch (ClassNotFoundException e) {
+   throw new RuntimeException(Cannot find type 
class of collected data type., e);
+   } catch (IOException e) {
+   throw new RuntimeException(Serialization error 
while deserializing collected data, e);
+   }
+   } else {
+   throw new RuntimeException(The job result did not 
contain any accumulator data.);
--- End diff --

The error message might be confusing to the user, because a user should not 
care that the count is implemented with accumulators. Maybe let's make it 
explicit in the message that this case is a bug and should not happen.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-13 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34503586
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
 ---
@@ -53,32 +53,34 @@
 
private final ExecutionConfig executionConfig;
 
-   private final HashMapString, Accumulator?, ? accumulators = new 
HashMapString, Accumulator?, ?();
-   
+   private final MapString, Accumulator?, ? accumulators;
+
private final DistributedCache distributedCache;
-   
-   
+
+
public AbstractRuntimeUDFContext(String name,
--- End diff --

I think a common practice is to have optional arguments last, e.g. the 
cpTasks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-13 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34503896
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.accumulators;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Main accumulator registry which encapsulates internal and user-defined 
accumulators.
+ */
+public class AccumulatorRegistry {
--- End diff --

Regarding the name of this class: I would either make it explicit in the 
class-level comment that this is PER task or rename it to something along the 
lines of TaskAccumulatorRegistry. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-13 Thread uce
Github user uce commented on the pull request:

https://github.com/apache/flink/pull/896#issuecomment-121046784
  
Nice piece of work! I agree with Stephan's points. I think it was good to 
address them. :) I've added some minor comments inline.

Regarding the high-level comments:

- Wasn't the rationale to have a separate message for the heartbeat and the 
accumulators to keep the heartbeat messages small? What do you think? In any 
case, I agree that it makes sense to be able to configure this.

- I would add a unit test for the record reader/writer accumulator results.

- There seems to be a 1:1 correspondence between the reporter and the 
internal metrics. Is the idea to have multiple different reporters in the 
future (for different types of metrics etc.) or is a long reporter sufficient?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-10 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34335824
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ---
@@ -131,6 +133,35 @@

private SerializedValueStateHandle? operatorState;
 
+   /* Lock for updating the accumulators atomically. */
--- End diff --

If you consider comment lines as empty (non-code) lines, then it follows 
the classes' style.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-10 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34335830
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -172,13 +173,20 @@
 
/** The library cache, from which the task can request its required JAR 
files */
private final LibraryCacheManager libraryCache;
-   
+
/** The cache for user-defined files that the invokable requires */
private final FileCache fileCache;
-   
+
/** The gateway to the network stack, which handles inputs and produced 
results */
private final NetworkEnvironment network;
 
+   /** The registry of this task which enables live reporting of 
accumulators */
+   private final AccumulatorRegistry accumulatorRegistry;
--- End diff --

Yes absolutely. This is an artifact of a former design.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-10 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34335963
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.accumulators;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Main accumulator registry which encapsulates internal and user-defined 
accumulators.
+ */
+public class AccumulatorRegistry {
--- End diff --

True. Originally, those two were a bit different. The API can be changed to 
be the same but I find keeping the two classes useful because it helps to 
differentiate acumulator context by type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-10 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34335420
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1017,6 +1040,9 @@ object TaskManager {
 
   val HEARTBEAT_INTERVAL: FiniteDuration = 5000 milliseconds
 
+  /* Interval to send accumulators to the job manager  */
+  val ACCUMULATOR_REPORT_INTERVAL: FiniteDuration = 10 seconds
--- End diff --

Removed. The update interval corresponds to the heartbeat interval.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-10 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/896#issuecomment-120302065
  
 Looks like this change breaks the YARN integration. The YARN WordCount no 
longer works.

Should be working again now.

 It would be good if the accumulator update interval was configurable.
 Edit: Is that the same value as the heartbeats?

Yes, that was a design rationale to keep the message count low. We could 
only send the accumulators in every Nth heartbeat and let it be configurable.

 The is a potential modification conflict: Drawing a snapshot for 
serialization and registering a new accumulator can lead to a 
ConcurrentModificationException in the drawing of the snapshot.

I conducted tests with concurrent insertions and deletions and found that 
only concurrent removals cause ConcurrentModificationExceptions. Removals are 
not allowed for accumulators. Anyways, we could switch to a synchronized or 
copy on write hash map. If we do I would opt for the latter.

 The naming of the accumulators refers sometimes to flink vs. 
user-defined, and sometimes to internal vs. external. Can we make this 
consistent? I actually like the flink vs. user-defined naming better.

Then let's stick to the flink vs. user-defined naming scheme.

 I think the code would be simpler is the registry simply always had a 
created map for internal and external accumulators. Also, a reporter object 
would help. 

I agree that would be a nicer way of dealing with the API.





---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-10 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/896#issuecomment-120305327
  
The reporter object has the advantage that it is more easily extensible. At 
some point we will want to differentiate between locally received bytes, and 
remotely received bytes, for example. Or include wait/stall times to detect 
whether a task is throttled by a slower predecessor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-10 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34335325
  
--- Diff: 
flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
 ---
@@ -60,6 +60,7 @@ public static void main(String[] args) throws Exception {

// set up the execution environment
final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
--- End diff --

Thanks for spotting it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-10 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34335345
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java
 ---
@@ -57,6 +58,12 @@
 
private final BarrierBuffer barrierBuffer;
 
+   /**
+* Counters for the number of bytes read and records processed.
+*/
+   private LongCounter numRecordsRead = null;
--- End diff --

Thanks, I didn't know.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-10 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34335337
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -172,13 +173,20 @@
 
/** The library cache, from which the task can request its required JAR 
files */
private final LibraryCacheManager libraryCache;
-   
+
/** The cache for user-defined files that the invokable requires */
private final FileCache fileCache;
-   
+
/** The gateway to the network stack, which handles inputs and produced 
results */
private final NetworkEnvironment network;
 
+   /** The registry of this task which enables live reporting of 
accumulators */
+   private final AccumulatorRegistry accumulatorRegistry;
+
+   public AccumulatorRegistry getAccumulatorRegistry() {
--- End diff --

I agree.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-10 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34335380
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -306,99 +308,100 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
 if (!isConnected) {
   log.debug(sDropping message $message because the TaskManager is 
currently  +
 not connected to a JobManager.)
-}
+} else {
 
-// we order the messages by frequency, to make sure the code paths for 
matching
-// are as short as possible
-message match {
+  // we order the messages by frequency, to make sure the code paths 
for matching
+  // are as short as possible
+  message match {
+
+// tell the task about the availability of a new input partition
+case UpdateTaskSinglePartitionInfo(executionID, resultID, 
partitionInfo) =
+  updateTaskInputPartitions(executionID, List((resultID, 
partitionInfo)))
+
+// tell the task about the availability of some new input 
partitions
+case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) 
=
+  updateTaskInputPartitions(executionID, partitionInfos)
+
+// discards intermediate result partitions of a task execution on 
this TaskManager
+case FailIntermediateResultPartitions(executionID) =
+  log.info(Discarding the results produced by task execution  + 
executionID)
+  if (network.isAssociated) {
+try {
+  
network.getPartitionManager.releasePartitionsProducedBy(executionID)
+} catch {
+  case t: Throwable = killTaskManagerFatal(
+Fatal leak: Unable to release intermediate result 
partition data, t)
+}
+  }
 
-  // tell the task about the availability of a new input partition
-  case UpdateTaskSinglePartitionInfo(executionID, resultID, 
partitionInfo) =
-updateTaskInputPartitions(executionID, List((resultID, 
partitionInfo)))
+// notifies the TaskManager that the state of a task has changed.
+// the TaskManager informs the JobManager and cleans up in case 
the transition
+// was into a terminal state, or in case the JobManager cannot be 
informed of the
+// state transition
 
-  // tell the task about the availability of some new input partitions
-  case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) =
-updateTaskInputPartitions(executionID, partitionInfos)
+case updateMsg@UpdateTaskExecutionState(taskExecutionState: 
TaskExecutionState) =
 
-  // discards intermediate result partitions of a task execution on 
this TaskManager
-  case FailIntermediateResultPartitions(executionID) =
-log.info(Discarding the results produced by task execution  + 
executionID)
-if (network.isAssociated) {
-  try {
-
network.getPartitionManager.releasePartitionsProducedBy(executionID)
-  } catch {
-case t: Throwable = killTaskManagerFatal(
-Fatal leak: Unable to release intermediate result 
partition data, t)
-  }
-}
+  // we receive these from our tasks and forward them to the 
JobManager
--- End diff --

This is a bug I discovered while reading through the code. It prevents 
processing of messages when the task manager is not connected to the job 
manager. If you look at line 307, it says it would skip the message but 
continues to process it. If you want I can open a separate pull request.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-10 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34335330
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
 ---
@@ -212,19 +218,19 @@ public InputGate getInputGate(int index) {
return inputGates;
}
 
-   @Override
-   public void reportAccumulators(MapString, Accumulator?, ? 
accumulators) {
-   AccumulatorEvent evt;
-   try {
-   evt = new AccumulatorEvent(getJobID(), accumulators);
-   }
-   catch (IOException e) {
-   throw new RuntimeException(Cannot serialize 
accumulators to send them to JobManager, e);
-   }
-
-   ReportAccumulatorResult accResult = new 
ReportAccumulatorResult(jobId, executionId, evt);
-   jobManagerActor.tell(accResult, ActorRef.noSender());
-   }
+// @Override
--- End diff --

Yes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-10 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34335342
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -237,12 +246,13 @@ public Task(TaskDeploymentDescriptor tdd,
 
this.memoryManager = checkNotNull(memManager);
this.ioManager = checkNotNull(ioManager);
-   this.broadcastVariableManager =checkNotNull(bcVarManager);
+   this.broadcastVariableManager = checkNotNull(bcVarManager);
+   this.accumulatorRegistry = accumulatorRegistry;
 
this.jobManager = checkNotNull(jobManagerActor);
this.taskManager = checkNotNull(taskManagerActor);
this.actorAskTimeout = new 
Timeout(checkNotNull(actorAskTimeout));
-   
--- End diff --

No auto formats, just removes redundant tabs in empty lines and corrects a 
missing space.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-10 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/896#issuecomment-120460741
  
I've addressed your comments in a new commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/896#issuecomment-120029960
  
Looks like this change breaks the YARN integration. The YARN WordCount no 
longer works.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34268803
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
 ---
@@ -212,19 +218,19 @@ public InputGate getInputGate(int index) {
return inputGates;
}
 
-   @Override
-   public void reportAccumulators(MapString, Accumulator?, ? 
accumulators) {
-   AccumulatorEvent evt;
-   try {
-   evt = new AccumulatorEvent(getJobID(), accumulators);
-   }
-   catch (IOException e) {
-   throw new RuntimeException(Cannot serialize 
accumulators to send them to JobManager, e);
-   }
-
-   ReportAccumulatorResult accResult = new 
ReportAccumulatorResult(jobId, executionId, evt);
-   jobManagerActor.tell(accResult, ActorRef.noSender());
-   }
+// @Override
--- End diff --

This can be properly removed, no?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34266800
  
--- Diff: 
flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
 ---
@@ -60,6 +60,7 @@ public static void main(String[] args) throws Exception {

// set up the execution environment
final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
--- End diff --

This must have been accidentally committed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34269026
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -237,12 +246,13 @@ public Task(TaskDeploymentDescriptor tdd,
 
this.memoryManager = checkNotNull(memManager);
this.ioManager = checkNotNull(ioManager);
-   this.broadcastVariableManager =checkNotNull(bcVarManager);
+   this.broadcastVariableManager = checkNotNull(bcVarManager);
+   this.accumulatorRegistry = accumulatorRegistry;
 
this.jobManager = checkNotNull(jobManagerActor);
this.taskManager = checkNotNull(taskManagerActor);
this.actorAskTimeout = new 
Timeout(checkNotNull(actorAskTimeout));
-   
--- End diff --

Lot's of auto-reformats...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/896#issuecomment-120054333
  
The is a potential modification conflict: Drawing a snapshot for 
serialization and registering a new accumulator can lead to a 
ConcurrentModificationException in the drawing of the snapshot.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34273441
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ---
@@ -601,6 +636,11 @@ void markFinished() {
}
}
 
+   synchronized (accumulatorLock) {
--- End diff --

Ignore this comment. The lock is good :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34274318
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -172,13 +173,20 @@
 
/** The library cache, from which the task can request its required JAR 
files */
private final LibraryCacheManager libraryCache;
-   
+
/** The cache for user-defined files that the invokable requires */
private final FileCache fileCache;
-   
+
/** The gateway to the network stack, which handles inputs and produced 
results */
private final NetworkEnvironment network;
 
+   /** The registry of this task which enables live reporting of 
accumulators */
+   private final AccumulatorRegistry accumulatorRegistry;
--- End diff --

Since the AccumulatorRegistry is only used task-internally, and always 
retrieved form there, it should be initialized internally. Saves one more 
constructor parameter and helps with separation of concerns.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34274983
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.accumulators;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Main accumulator registry which encapsulates internal and user-defined 
accumulators.
+ */
+public class AccumulatorRegistry {
--- End diff --

I do not understand the differentiation between the implementation of the 
`Internal` and `External` registry. From the usage pattern, both are accesses 
and initialized with a hash map. One time the hash map is created by the 
caller, once by the registry. I have not found a place where it would not work 
that the registry always creates the map immediately.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34270017
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java
 ---
@@ -57,6 +58,12 @@
 
private final BarrierBuffer barrierBuffer;
 
+   /**
+* Counters for the number of bytes read and records processed.
+*/
+   private LongCounter numRecordsRead = null;
--- End diff --

`null` initializations are actually redundant. They still get executed (for 
OpenJDK javac and Oracle javac), so it is actually overhead for no reason.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/896#issuecomment-120058469
  
The naming of the accumulators refers sometimes to flink vs. 
user-defined, and sometimes to internal vs. external. Can we make this 
consistent? I actually like the flink vs. user-defined naming better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34271710
  
--- Diff: 
flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
 ---
@@ -60,6 +60,7 @@ public static void main(String[] args) throws Exception {

// set up the execution environment
final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
--- End diff --

Probably the reason that the YARN tests are broken...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34272101
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -306,99 +308,100 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
 if (!isConnected) {
   log.debug(sDropping message $message because the TaskManager is 
currently  +
 not connected to a JobManager.)
-}
+} else {
 
-// we order the messages by frequency, to make sure the code paths for 
matching
-// are as short as possible
-message match {
+  // we order the messages by frequency, to make sure the code paths 
for matching
+  // are as short as possible
+  message match {
+
+// tell the task about the availability of a new input partition
+case UpdateTaskSinglePartitionInfo(executionID, resultID, 
partitionInfo) =
+  updateTaskInputPartitions(executionID, List((resultID, 
partitionInfo)))
+
+// tell the task about the availability of some new input 
partitions
+case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) 
=
+  updateTaskInputPartitions(executionID, partitionInfos)
+
+// discards intermediate result partitions of a task execution on 
this TaskManager
+case FailIntermediateResultPartitions(executionID) =
+  log.info(Discarding the results produced by task execution  + 
executionID)
+  if (network.isAssociated) {
+try {
+  
network.getPartitionManager.releasePartitionsProducedBy(executionID)
+} catch {
+  case t: Throwable = killTaskManagerFatal(
+Fatal leak: Unable to release intermediate result 
partition data, t)
+}
+  }
 
-  // tell the task about the availability of a new input partition
-  case UpdateTaskSinglePartitionInfo(executionID, resultID, 
partitionInfo) =
-updateTaskInputPartitions(executionID, List((resultID, 
partitionInfo)))
+// notifies the TaskManager that the state of a task has changed.
+// the TaskManager informs the JobManager and cleans up in case 
the transition
+// was into a terminal state, or in case the JobManager cannot be 
informed of the
+// state transition
 
-  // tell the task about the availability of some new input partitions
-  case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) =
-updateTaskInputPartitions(executionID, partitionInfos)
+case updateMsg@UpdateTaskExecutionState(taskExecutionState: 
TaskExecutionState) =
 
-  // discards intermediate result partitions of a task execution on 
this TaskManager
-  case FailIntermediateResultPartitions(executionID) =
-log.info(Discarding the results produced by task execution  + 
executionID)
-if (network.isAssociated) {
-  try {
-
network.getPartitionManager.releasePartitionsProducedBy(executionID)
-  } catch {
-case t: Throwable = killTaskManagerFatal(
-Fatal leak: Unable to release intermediate result 
partition data, t)
-  }
-}
+  // we receive these from our tasks and forward them to the 
JobManager
--- End diff --

Here is a lot of changed code that was seemingly edited without need (has 
nothing to do with the accumulators). Since that is pretty sensitive code, I 
feel very hesitant to commit these massive edits. What was the reason for these 
changes in the first place?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34273000
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ---
@@ -131,6 +133,35 @@

private SerializedValueStateHandle? operatorState;
 
+   /* Lock for updating the accumulators atomically. */
--- End diff --

Why not follow the style of the remaining class, with respect to empty 
lines between fields?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-09 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/896

[FLINK-2292][FLINK-1573] add live per-task accumulators

This refactors the accumulators to accumulate per task execution. The
accumulators are reported from the task managers periodically to the job
manager via the Heartbeat message. If the execution contains chained
tasks, the accumulators are chained as well. The final accumulator
results are reported via the UpdateTaskExecutionState message.

The accumulators are now saved in the Execution within the
ExecutionGraph. This makes the AccumulatorManager obsolete. It has been
removed for now. In the future, we might introduce some caching for the
web frontend visualization.

Two types of accumulators are available:

- external (user-defined via the RuntimeContext)
- internal (flink metrics defined in the invocables)

The internal (built-in) metrics are targeted at users who want to
monitor their programs, e.g. through the job manager's web frontend.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink live-accumulators

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/896.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #896


commit 7cec1236f087e72b40022bf02a6dbb12d74acbac
Author: Maximilian Michels m...@apache.org
Date:   2015-07-08T07:23:42Z

[FLINK-2292][FLINK-1573] add live per-task accumulators

This refactors the accumulators to accumulate per task execution. The
accumulators are reported from the task managers periodically to the job
manager via the Heartbeat message. If the execution contains chained
tasks, the accumulators are chained as well. The final accumulator
results are reported via the UpdateTaskExecutionState message.

The accumulators are now saved in the Execution within the
ExecutionGraph. This makes the AccumulatorManager obsolete. It has been
removed for now. In the future, we might introduce some caching for the
web frontend visualization.

Two types of accumulators are available:

- external (user-defined via the RuntimeContext)
- internal (flink metrics defined in the invocables)

The internal (built-in) metrics are targeted at users who want to
monitor their programs, e.g. through the job manager's web frontend.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34272308
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1017,6 +1040,9 @@ object TaskManager {
 
   val HEARTBEAT_INTERVAL: FiniteDuration = 5000 milliseconds
 
+  /* Interval to send accumulators to the job manager  */
+  val ACCUMULATOR_REPORT_INTERVAL: FiniteDuration = 10 seconds
--- End diff --

This variable is nowhere ever used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...

2015-07-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/896#discussion_r34273136
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ---
@@ -601,6 +636,11 @@ void markFinished() {
}
}
 
+   synchronized (accumulatorLock) {
--- End diff --

This lock here seems redundant. No place is looking for those two to be in 
sync.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---