Repository: flink
Updated Branches:
  refs/heads/master f46ca3918 -> d95929e01


[FLINK-4829] snapshot accumulators on a best-effort basis

Heartbeats should not fail when accumulators could not be snapshotted. Instead,
we should simply skip the reporting of the failed accumulator. Eventually, the
accumulator will be reported; at the latest, when the job finishes.

This closes #2649


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d95929e0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d95929e0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d95929e0

Branch: refs/heads/master
Commit: d95929e0110b53f03452e1ad453de2522f79a6b8
Parents: 783dca5
Author: Maximilian Michels <m...@apache.org>
Authored: Mon Oct 17 14:19:00 2016 +0200
Committer: Maximilian Michels <m...@apache.org>
Committed: Tue Oct 18 15:55:33 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/taskmanager/TaskManager.scala | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d95929e0/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index f8f333e..1017ea0 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -35,6 +35,7 @@ import com.codahale.metrics.jvm.{BufferPoolMetricSet, 
GarbageCollectorMetricSet,
 import com.codahale.metrics.{Gauge, MetricFilter, MetricRegistry}
 import com.fasterxml.jackson.databind.ObjectMapper
 import grizzled.slf4j.Logger
+import org.apache.commons.lang3.exception.ExceptionUtils
 import org.apache.flink.configuration._
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.core.memory.{HeapMemorySegment, HybridMemorySegment, 
MemorySegmentFactory, MemoryType}
@@ -1335,9 +1336,15 @@ class TaskManager(
 
       runningTasks.asScala foreach {
         case (execID, task) =>
-          val registry = task.getAccumulatorRegistry
-          val accumulators = registry.getSnapshot
-          accumulatorEvents.append(accumulators)
+          try {
+            val registry = task.getAccumulatorRegistry
+            val accumulators = registry.getSnapshot
+            accumulatorEvents.append(accumulators)
+          } catch {
+            case e: Exception =>
+              log.warn("Failed to take accumulator snapshot for task {}.",
+                execID, ExceptionUtils.getRootCause(e))
+          }
       }
 
        currentJobManager foreach {

Reply via email to