Repository: flink Updated Branches: refs/heads/master f46ca3918 -> d95929e01
[FLINK-4829] snapshot accumulators on a best-effort basis Heartbeats should not fail when accumulators could not be snapshotted. Instead, we should simply skip the reporting of the failed accumulator. Eventually, the accumulator will be reported; at the latest, when the job finishes. This closes #2649 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d95929e0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d95929e0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d95929e0 Branch: refs/heads/master Commit: d95929e0110b53f03452e1ad453de2522f79a6b8 Parents: 783dca5 Author: Maximilian Michels <m...@apache.org> Authored: Mon Oct 17 14:19:00 2016 +0200 Committer: Maximilian Michels <m...@apache.org> Committed: Tue Oct 18 15:55:33 2016 +0200 ---------------------------------------------------------------------- .../apache/flink/runtime/taskmanager/TaskManager.scala | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d95929e0/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index f8f333e..1017ea0 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -35,6 +35,7 @@ import com.codahale.metrics.jvm.{BufferPoolMetricSet, GarbageCollectorMetricSet, import com.codahale.metrics.{Gauge, MetricFilter, MetricRegistry} import com.fasterxml.jackson.databind.ObjectMapper import grizzled.slf4j.Logger +import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.flink.configuration._ import org.apache.flink.core.fs.FileSystem import org.apache.flink.core.memory.{HeapMemorySegment, HybridMemorySegment, MemorySegmentFactory, MemoryType} @@ -1335,9 +1336,15 @@ class TaskManager( runningTasks.asScala foreach { case (execID, task) => - val registry = task.getAccumulatorRegistry - val accumulators = registry.getSnapshot - accumulatorEvents.append(accumulators) + try { + val registry = task.getAccumulatorRegistry + val accumulators = registry.getSnapshot + accumulatorEvents.append(accumulators) + } catch { + case e: Exception => + log.warn("Failed to take accumulator snapshot for task {}.", + execID, ExceptionUtils.getRootCause(e)) + } } currentJobManager foreach {