This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8821ed8  [BEAM-4552] Use Spark AccumulatorsV2 for metrics
     new 36b839e  Merge pull request #8387: [BEAM-4552] Use Spark 
AccumulatorsV2 API
8821ed8 is described below

commit 8821ed8c3f6b5f4d16abf98d17910cc4a9ba8720
Author: Alexey Romanenko <aromanenko....@gmail.com>
AuthorDate: Tue Apr 23 18:27:33 2019 +0200

    [BEAM-4552] Use Spark AccumulatorsV2 for metrics
---
 .../runners/spark/aggregators/AggAccumParam.java   | 39 -------------
 .../spark/aggregators/AggregatorsAccumulator.java  | 19 ++++---
 .../aggregators/NamedAggregatorsAccumulator.java   | 63 +++++++++++++++++++++
 .../apache/beam/runners/spark/io/SourceRDD.java    |  7 +--
 .../runners/spark/io/SparkUnboundedSource.java     |  4 +-
 .../runners/spark/metrics/MetricsAccumulator.java  | 23 ++++----
 .../spark/metrics/MetricsAccumulatorParam.java     | 42 --------------
 .../MetricsContainerStepMapAccumulator.java        | 65 ++++++++++++++++++++++
 .../spark/translation/DoFnRunnerWithMetrics.java   |  9 ++-
 .../spark/translation/MultiDoFnFunction.java       | 10 ++--
 .../translation/SparkExecutableStageFunction.java  | 11 ++--
 .../SparkGroupAlsoByWindowViaOutputBufferFn.java   |  5 +-
 .../spark/translation/TransformTranslator.java     |  9 ++-
 .../streaming/StreamingTransformTranslator.java    |  5 +-
 .../SparkExecutableStageFunctionTest.java          |  6 +-
 15 files changed, 180 insertions(+), 137 deletions(-)

diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggAccumParam.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggAccumParam.java
deleted file mode 100644
index 6f80ce2..0000000
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggAccumParam.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.aggregators;
-
-import org.apache.spark.AccumulatorParam;
-
-/** Aggregator accumulator param. */
-class AggAccumParam implements AccumulatorParam<NamedAggregators> {
-
-  @Override
-  public NamedAggregators addAccumulator(NamedAggregators current, 
NamedAggregators added) {
-    return current.merge(added);
-  }
-
-  @Override
-  public NamedAggregators addInPlace(NamedAggregators current, 
NamedAggregators added) {
-    return addAccumulator(current, added);
-  }
-
-  @Override
-  public NamedAggregators zero(NamedAggregators initialValue) {
-    return new NamedAggregators();
-  }
-}
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java
index e452079..4722b43 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java
@@ -25,18 +25,18 @@ import 
org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleF
 import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Optional;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.streaming.api.java.JavaStreamingListener;
 import org.apache.spark.streaming.api.java.JavaStreamingListenerBatchCompleted;
+import org.apache.spark.util.AccumulatorV2;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * For resilience, {@link Accumulator Accumulators} are required to be wrapped 
in a Singleton.
+ * For resilience, {@link AccumulatorV2 Accumulators} are required to be 
wrapped in a Singleton.
  *
  * @see <a
- *     
href="https://spark.apache.org/docs/1.6.3/streaming-programming-guide.html#accumulators-and-broadcast-variables";>accumulators</a>
+ *     
href="https://spark.apache.org/docs/latest/api/java/org/apache/spark/util/AccumulatorV2.html";>accumulatorsV2</a>
  */
 public class AggregatorsAccumulator {
   private static final Logger LOG = 
LoggerFactory.getLogger(AggregatorsAccumulator.class);
@@ -44,7 +44,7 @@ public class AggregatorsAccumulator {
   private static final String ACCUMULATOR_NAME = "Beam.Aggregators";
   private static final String ACCUMULATOR_CHECKPOINT_FILENAME = "aggregators";
 
-  private static volatile Accumulator<NamedAggregators> instance = null;
+  private static volatile NamedAggregatorsAccumulator instance = null;
   private static volatile FileSystem fileSystem;
   private static volatile Path checkpointFilePath;
 
@@ -57,13 +57,16 @@ public class AggregatorsAccumulator {
               opts.isStreaming()
                   ? Optional.of(new CheckpointDir(opts.getCheckpointDir()))
                   : Optional.absent();
-          Accumulator<NamedAggregators> accumulator =
-              jsc.sc().accumulator(new NamedAggregators(), ACCUMULATOR_NAME, 
new AggAccumParam());
+          NamedAggregators namedAggregators = new NamedAggregators();
+          NamedAggregatorsAccumulator accumulator =
+              new NamedAggregatorsAccumulator(namedAggregators);
+          jsc.sc().register(accumulator, ACCUMULATOR_NAME);
+
           if (maybeCheckpointDir.isPresent()) {
             Optional<NamedAggregators> maybeRecoveredValue =
                 recoverValueFromCheckpoint(jsc, maybeCheckpointDir.get());
             if (maybeRecoveredValue.isPresent()) {
-              accumulator.setValue(maybeRecoveredValue.get());
+              accumulator = new 
NamedAggregatorsAccumulator(maybeRecoveredValue.get());
             }
           }
           instance = accumulator;
@@ -73,7 +76,7 @@ public class AggregatorsAccumulator {
     }
   }
 
-  public static Accumulator<NamedAggregators> getInstance() {
+  public static NamedAggregatorsAccumulator getInstance() {
     if (instance == null) {
       throw new IllegalStateException("Aggregrators accumulator has not been 
instantiated");
     } else {
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregatorsAccumulator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregatorsAccumulator.java
new file mode 100644
index 0000000..5775814
--- /dev/null
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregatorsAccumulator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.aggregators;
+
+import org.apache.spark.util.AccumulatorV2;
+
+/** {@link AccumulatorV2} implementation for {@link NamedAggregators}. */
+public class NamedAggregatorsAccumulator extends 
AccumulatorV2<NamedAggregators, NamedAggregators> {
+  private static final NamedAggregators empty = new NamedAggregators();
+
+  private NamedAggregators value;
+
+  public NamedAggregatorsAccumulator(NamedAggregators value) {
+    this.value = value;
+  }
+
+  @Override
+  public boolean isZero() {
+    return value.equals(empty);
+  }
+
+  @Override
+  public NamedAggregatorsAccumulator copy() {
+    NamedAggregators newContainer = new NamedAggregators();
+    newContainer.merge(value);
+    return new NamedAggregatorsAccumulator(newContainer);
+  }
+
+  @Override
+  public void reset() {
+    this.value = new NamedAggregators();
+  }
+
+  @Override
+  public void add(NamedAggregators other) {
+    this.value.merge(other);
+  }
+
+  @Override
+  public void merge(AccumulatorV2<NamedAggregators, NamedAggregators> other) {
+    this.value.merge(other.value());
+  }
+
+  @Override
+  public NamedAggregators value() {
+    return this.value;
+  }
+}
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
index a264e49..007e80b 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
@@ -27,9 +27,9 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
 import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
-import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
+import 
org.apache.beam.runners.spark.metrics.MetricsContainerStepMapAccumulator;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.io.UnboundedSource;
@@ -37,7 +37,6 @@ import org.apache.beam.sdk.metrics.MetricsContainer;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.util.WindowedValue;
 import 
org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
-import org.apache.spark.Accumulator;
 import org.apache.spark.Dependency;
 import org.apache.spark.HashPartitioner;
 import org.apache.spark.InterruptibleIterator;
@@ -67,7 +66,7 @@ public class SourceRDD {
     private final int numPartitions;
     private final long bundleSize;
     private final String stepName;
-    private final Accumulator<MetricsContainerStepMap> metricsAccum;
+    private final MetricsContainerStepMapAccumulator metricsAccum;
 
     // to satisfy Scala API.
     private static final scala.collection.immutable.Seq<Dependency<?>> NIL =
@@ -136,7 +135,7 @@ public class SourceRDD {
     @Override
     public scala.collection.Iterator<WindowedValue<T>> compute(
         final Partition split, final TaskContext context) {
-      final MetricsContainer metricsContainer = 
metricsAccum.localValue().getContainer(stepName);
+      final MetricsContainer metricsContainer = 
metricsAccum.value().getContainer(stepName);
 
       @SuppressWarnings("unchecked")
       final BoundedSource.BoundedReader<T> reader = 
createReader((SourcePartition<T>) split);
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
index 75e4114..8235ee6 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
@@ -27,6 +27,7 @@ import 
org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
+import 
org.apache.beam.runners.spark.metrics.MetricsContainerStepMapAccumulator;
 import org.apache.beam.runners.spark.stateful.StateSpecFunctions;
 import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
 import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
@@ -42,7 +43,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Splitter;
-import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext$;
 import org.apache.spark.api.java.function.FlatMapFunction;
@@ -191,7 +191,7 @@ public class SparkUnboundedSource {
     public scala.Option<RDD<BoxedUnit>> compute(Time validTime) {
       // compute parent.
       scala.Option<RDD<Metadata>> parentRDDOpt = 
parent.getOrCompute(validTime);
-      final Accumulator<MetricsContainerStepMap> metricsAccum = 
MetricsAccumulator.getInstance();
+      final MetricsContainerStepMapAccumulator metricsAccum = 
MetricsAccumulator.getInstance();
       long count = 0;
       SparkWatermarks sparkWatermark = null;
       Instant globalLowWatermarkForBatch = BoundedWindow.TIMESTAMP_MIN_VALUE;
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java
index f511ffa..f21b1f6 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java
@@ -26,18 +26,18 @@ import 
org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleF
 import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Optional;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.streaming.api.java.JavaStreamingListener;
 import org.apache.spark.streaming.api.java.JavaStreamingListenerBatchCompleted;
+import org.apache.spark.util.AccumulatorV2;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * For resilience, {@link Accumulator Accumulators} are required to be wrapped 
in a Singleton.
+ * For resilience, {@link AccumulatorV2 Accumulators} are required to be 
wrapped in a Singleton.
  *
  * @see <a
- *     
href="https://spark.apache.org/docs/1.6.3/streaming-programming-guide.html#accumulators-and-broadcast-variables";>accumulators</a>
+ *     
href="https://spark.apache.org/docs/latest/api/java/org/apache/spark/util/AccumulatorV2.html";>accumulatorsV2</a>
  */
 public class MetricsAccumulator {
   private static final Logger LOG = 
LoggerFactory.getLogger(MetricsAccumulator.class);
@@ -45,7 +45,7 @@ public class MetricsAccumulator {
   private static final String ACCUMULATOR_NAME = "Beam.Metrics";
   private static final String ACCUMULATOR_CHECKPOINT_FILENAME = "metrics";
 
-  private static volatile Accumulator<MetricsContainerStepMap> instance = null;
+  private static volatile MetricsContainerStepMapAccumulator instance = null;
   private static volatile FileSystem fileSystem;
   private static volatile Path checkpointFilePath;
 
@@ -58,17 +58,16 @@ public class MetricsAccumulator {
               opts.isStreaming()
                   ? Optional.of(new CheckpointDir(opts.getCheckpointDir()))
                   : Optional.absent();
-          Accumulator<MetricsContainerStepMap> accumulator =
-              jsc.sc()
-                  .accumulator(
-                      new SparkMetricsContainerStepMap(),
-                      ACCUMULATOR_NAME,
-                      new MetricsAccumulatorParam());
+          MetricsContainerStepMap metricsContainerStepMap = new 
MetricsContainerStepMap();
+          MetricsContainerStepMapAccumulator accumulator =
+              new MetricsContainerStepMapAccumulator(metricsContainerStepMap);
+          jsc.sc().register(accumulator, ACCUMULATOR_NAME);
+
           if (maybeCheckpointDir.isPresent()) {
             Optional<MetricsContainerStepMap> maybeRecoveredValue =
                 recoverValueFromCheckpoint(jsc, maybeCheckpointDir.get());
             if (maybeRecoveredValue.isPresent()) {
-              accumulator.setValue(maybeRecoveredValue.get());
+              accumulator = new 
MetricsContainerStepMapAccumulator(maybeRecoveredValue.get());
             }
           }
           instance = accumulator;
@@ -78,7 +77,7 @@ public class MetricsAccumulator {
     }
   }
 
-  public static Accumulator<MetricsContainerStepMap> getInstance() {
+  public static MetricsContainerStepMapAccumulator getInstance() {
     if (instance == null) {
       throw new IllegalStateException("Metrics accumulator has not been 
instantiated");
     } else {
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java
deleted file mode 100644
index 9303935..0000000
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.metrics;
-
-import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
-import org.apache.spark.AccumulatorParam;
-
-/** Metrics accumulator param. */
-class MetricsAccumulatorParam implements 
AccumulatorParam<MetricsContainerStepMap> {
-  @Override
-  public MetricsContainerStepMap addAccumulator(
-      MetricsContainerStepMap c1, MetricsContainerStepMap c2) {
-    return addInPlace(c1, c2);
-  }
-
-  @Override
-  public MetricsContainerStepMap addInPlace(
-      MetricsContainerStepMap c1, MetricsContainerStepMap c2) {
-    c1.updateAll(c2);
-    return c1;
-  }
-
-  @Override
-  public MetricsContainerStepMap zero(MetricsContainerStepMap initialValue) {
-    return new MetricsContainerStepMap();
-  }
-}
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsContainerStepMapAccumulator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsContainerStepMapAccumulator.java
new file mode 100644
index 0000000..61de79f
--- /dev/null
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsContainerStepMapAccumulator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.metrics;
+
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
+import org.apache.spark.util.AccumulatorV2;
+
+/** {@link AccumulatorV2} implementation for {@link MetricsContainerStepMap}. 
*/
+public class MetricsContainerStepMapAccumulator
+    extends AccumulatorV2<MetricsContainerStepMap, MetricsContainerStepMap> {
+  private static final MetricsContainerStepMap empty = new 
MetricsContainerStepMap();
+
+  private MetricsContainerStepMap value;
+
+  public MetricsContainerStepMapAccumulator(MetricsContainerStepMap value) {
+    this.value = value;
+  }
+
+  @Override
+  public boolean isZero() {
+    return value.equals(empty);
+  }
+
+  @Override
+  public MetricsContainerStepMapAccumulator copy() {
+    MetricsContainerStepMap newContainer = new MetricsContainerStepMap();
+    newContainer.updateAll(value);
+    return new MetricsContainerStepMapAccumulator(newContainer);
+  }
+
+  @Override
+  public void reset() {
+    this.value = new MetricsContainerStepMap();
+  }
+
+  @Override
+  public void add(MetricsContainerStepMap other) {
+    this.value.updateAll(other);
+  }
+
+  @Override
+  public void merge(AccumulatorV2<MetricsContainerStepMap, 
MetricsContainerStepMap> other) {
+    this.value.updateAll(other.value());
+  }
+
+  @Override
+  public MetricsContainerStepMap value() {
+    return this.value;
+  }
+}
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java
index 25f88aa..845dc63 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java
@@ -21,26 +21,25 @@ import java.io.Closeable;
 import java.io.IOException;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
-import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
+import 
org.apache.beam.runners.spark.metrics.MetricsContainerStepMapAccumulator;
 import org.apache.beam.sdk.metrics.MetricsContainer;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.spark.Accumulator;
 import org.joda.time.Instant;
 
 /** DoFnRunner decorator which registers {@link MetricsContainerImpl}. */
 class DoFnRunnerWithMetrics<InputT, OutputT> implements DoFnRunner<InputT, 
OutputT> {
   private final DoFnRunner<InputT, OutputT> delegate;
   private final String stepName;
-  private final Accumulator<MetricsContainerStepMap> metricsAccum;
+  private final MetricsContainerStepMapAccumulator metricsAccum;
 
   DoFnRunnerWithMetrics(
       String stepName,
       DoFnRunner<InputT, OutputT> delegate,
-      Accumulator<MetricsContainerStepMap> metricsAccum) {
+      MetricsContainerStepMapAccumulator metricsAccum) {
     this.delegate = delegate;
     this.stepName = stepName;
     this.metricsAccum = metricsAccum;
@@ -92,6 +91,6 @@ class DoFnRunnerWithMetrics<InputT, OutputT> implements 
DoFnRunner<InputT, Outpu
   }
 
   private MetricsContainer metricsContainer() {
-    return metricsAccum.localValue().getContainer(stepName);
+    return metricsAccum.value().getContainer(stepName);
   }
 }
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
index 566edd1..badca38 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
@@ -30,7 +30,7 @@ import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StepContext;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
-import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
+import 
org.apache.beam.runners.spark.metrics.MetricsContainerStepMapAccumulator;
 import org.apache.beam.runners.spark.util.CachedSideInputReader;
 import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.runners.spark.util.SparkSideInputReader;
@@ -48,8 +48,8 @@ import 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Function;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterators;
 import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.LinkedListMultimap;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Multimap;
-import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.util.AccumulatorV2;
 import scala.Tuple2;
 
 /**
@@ -62,7 +62,7 @@ import scala.Tuple2;
 public class MultiDoFnFunction<InputT, OutputT>
     implements PairFlatMapFunction<Iterator<WindowedValue<InputT>>, 
TupleTag<?>, WindowedValue<?>> {
 
-  private final Accumulator<MetricsContainerStepMap> metricsAccum;
+  private final MetricsContainerStepMapAccumulator metricsAccum;
   private final String stepName;
   private final DoFn<InputT, OutputT> doFn;
   private transient boolean wasSetupCalled;
@@ -77,7 +77,7 @@ public class MultiDoFnFunction<InputT, OutputT>
   private final DoFnSchemaInformation doFnSchemaInformation;
 
   /**
-   * @param metricsAccum The Spark {@link Accumulator} that backs the Beam 
metrics.
+   * @param metricsAccum The Spark {@link AccumulatorV2} that backs the Beam 
metrics.
    * @param doFn The {@link DoFn} to be wrapped.
    * @param options The {@link SerializablePipelineOptions}.
    * @param mainOutputTag The main output {@link TupleTag}.
@@ -89,7 +89,7 @@ public class MultiDoFnFunction<InputT, OutputT>
    * @param stateful Stateful {@link DoFn}.
    */
   public MultiDoFnFunction(
-      Accumulator<MetricsContainerStepMap> metricsAccum,
+      MetricsContainerStepMapAccumulator metricsAccum,
       String stepName,
       DoFn<InputT, OutputT> doFn,
       SerializablePipelineOptions options,
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
index 231714b..d49300e 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
@@ -33,7 +33,6 @@ import 
org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey.TypeCase;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
-import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
 import org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory;
 import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
@@ -46,11 +45,11 @@ import 
org.apache.beam.runners.fnexecution.state.StateRequestHandler;
 import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
 import 
org.apache.beam.runners.fnexecution.translation.BatchSideInputHandlerFactory;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
+import 
org.apache.beam.runners.spark.metrics.MetricsContainerStepMapAccumulator;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
-import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.broadcast.Broadcast;
 import org.slf4j.Logger;
@@ -77,14 +76,14 @@ class SparkExecutableStageFunction<InputT, SideInputT>
   // map from pCollection id to tuple of serialized bytes and coder to decode 
the bytes
   private final Map<String, Tuple2<Broadcast<List<byte[]>>, 
WindowedValueCoder<SideInputT>>>
       sideInputs;
-  private final Accumulator<MetricsContainerStepMap> metricsAccumulator;
+  private final MetricsContainerStepMapAccumulator metricsAccumulator;
 
   SparkExecutableStageFunction(
       RunnerApi.ExecutableStagePayload stagePayload,
       JobInfo jobInfo,
       Map<String, Integer> outputMap,
       Map<String, Tuple2<Broadcast<List<byte[]>>, 
WindowedValueCoder<SideInputT>>> sideInputs,
-      Accumulator<MetricsContainerStepMap> metricsAccumulator) {
+      MetricsContainerStepMapAccumulator metricsAccumulator) {
     this(
         stagePayload,
         outputMap,
@@ -98,7 +97,7 @@ class SparkExecutableStageFunction<InputT, SideInputT>
       Map<String, Integer> outputMap,
       JobBundleFactoryCreator jobBundleFactoryCreator,
       Map<String, Tuple2<Broadcast<List<byte[]>>, 
WindowedValueCoder<SideInputT>>> sideInputs,
-      Accumulator<MetricsContainerStepMap> metricsAccumulator) {
+      MetricsContainerStepMapAccumulator metricsAccumulator) {
     this.stagePayload = stagePayload;
     this.outputMap = outputMap;
     this.jobBundleFactoryCreator = jobBundleFactoryCreator;
@@ -116,7 +115,7 @@ class SparkExecutableStageFunction<InputT, SideInputT>
       StateRequestHandler stateRequestHandler =
           getStateRequestHandler(executableStage, 
stageBundleFactory.getProcessBundleDescriptor());
       String stageName = stagePayload.getInput();
-      MetricsContainerImpl container = 
metricsAccumulator.localValue().getContainer(stageName);
+      MetricsContainerImpl container = 
metricsAccumulator.value().getContainer(stageName);
       BundleProgressHandler bundleProgressHandler =
           new BundleProgressHandler() {
             @Override
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
index 158f303..f3b2600 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
@@ -34,14 +34,13 @@ import 
org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.core.construction.TriggerTranslation;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
-import org.apache.beam.runners.spark.aggregators.NamedAggregators;
+import org.apache.beam.runners.spark.aggregators.NamedAggregatorsAccumulator;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.joda.time.Instant;
 
@@ -61,7 +60,7 @@ class SparkGroupAlsoByWindowViaOutputBufferFn<K, InputT, W 
extends BoundedWindow
       StateInternalsFactory<K> stateInternalsFactory,
       SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, W> 
reduceFn,
       SerializablePipelineOptions options,
-      Accumulator<NamedAggregators> accumulator) {
+      NamedAggregatorsAccumulator accumulator) {
     this.windowingStrategy = windowingStrategy;
     this.stateInternalsFactory = stateInternalsFactory;
     this.reduceFn = reduceFn;
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 159d5a3..8cb79ad 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -28,13 +28,13 @@ import javax.annotation.Nullable;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.ParDoTranslation;
-import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
-import org.apache.beam.runners.spark.aggregators.NamedAggregators;
+import org.apache.beam.runners.spark.aggregators.NamedAggregatorsAccumulator;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.runners.spark.io.SourceRDD;
 import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
+import 
org.apache.beam.runners.spark.metrics.MetricsContainerStepMapAccumulator;
 import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
@@ -68,7 +68,6 @@ import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Optional;
 import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.FluentIterable;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
-import org.apache.spark.Accumulator;
 import org.apache.spark.HashPartitioner;
 import org.apache.spark.Partitioner;
 import org.apache.spark.api.java.JavaPairRDD;
@@ -122,7 +121,7 @@ public final class TransformTranslator {
         JavaRDD<WindowedValue<KV<K, V>>> inRDD =
             ((BoundedDataset<KV<K, V>>) 
context.borrowDataset(transform)).getRDD();
         final KvCoder<K, V> coder = (KvCoder<K, V>) 
context.getInput(transform).getCoder();
-        final Accumulator<NamedAggregators> accum = 
AggregatorsAccumulator.getInstance();
+        final NamedAggregatorsAccumulator accum = 
AggregatorsAccumulator.getInstance();
         @SuppressWarnings("unchecked")
         final WindowingStrategy<?, W> windowingStrategy =
             (WindowingStrategy<?, W>) 
context.getInput(transform).getWindowingStrategy();
@@ -351,7 +350,7 @@ public final class TransformTranslator {
             ((BoundedDataset<InputT>) 
context.borrowDataset(transform)).getRDD();
         WindowingStrategy<?, ?> windowingStrategy =
             context.getInput(transform).getWindowingStrategy();
-        Accumulator<MetricsContainerStepMap> metricsAccum = 
MetricsAccumulator.getInstance();
+        MetricsContainerStepMapAccumulator metricsAccum = 
MetricsAccumulator.getInstance();
         Coder<InputT> inputCoder = (Coder<InputT>) 
context.getInput(transform).getCoder();
         Map<TupleTag<?>, Coder<?>> outputCoders = context.getOutputCoders();
         JavaPairRDD<TupleTag<?>, WindowedValue<?>> all;
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 8efc477..1e6c50c 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -36,12 +36,12 @@ import 
org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.ParDoTranslation;
 import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import 
org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
-import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.runners.spark.io.ConsoleIO;
 import org.apache.beam.runners.spark.io.CreateStream;
 import org.apache.beam.runners.spark.io.SparkUnboundedSource;
 import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
+import 
org.apache.beam.runners.spark.metrics.MetricsContainerStepMapAccumulator;
 import 
org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
 import org.apache.beam.runners.spark.translation.BoundedDataset;
 import org.apache.beam.runners.spark.translation.Dataset;
@@ -83,7 +83,6 @@ import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
-import org.apache.spark.Accumulator;
 import org.apache.spark.HashPartitioner;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -412,7 +411,7 @@ public final class StreamingTransformTranslator {
         JavaPairDStream<TupleTag<?>, WindowedValue<?>> all =
             dStream.transformToPair(
                 rdd -> {
-                  final Accumulator<MetricsContainerStepMap> metricsAccum =
+                  final MetricsContainerStepMapAccumulator metricsAccum =
                       MetricsAccumulator.getInstance();
                   final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, 
SideInputBroadcast<?>>>
                       sideInputs =
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java
index 56ca69a..64657ae 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java
@@ -43,12 +43,12 @@ import 
org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
 import org.apache.beam.runners.fnexecution.control.RemoteBundle;
 import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
 import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import 
org.apache.beam.runners.spark.metrics.MetricsContainerStepMapAccumulator;
 import 
org.apache.beam.runners.spark.translation.SparkExecutableStageFunction.JobBundleFactoryCreator;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.util.WindowedValue;
 import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
-import org.apache.spark.Accumulator;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
@@ -61,7 +61,7 @@ public class SparkExecutableStageFunctionTest {
   @Mock private JobBundleFactory jobBundleFactory;
   @Mock private StageBundleFactory stageBundleFactory;
   @Mock private RemoteBundle remoteBundle;
-  @Mock private Accumulator<MetricsContainerStepMap> metricsAccumulator;
+  @Mock private MetricsContainerStepMapAccumulator metricsAccumulator;
   @Mock private MetricsContainerStepMap stepMap;
   @Mock private MetricsContainerImpl container;
 
@@ -91,7 +91,7 @@ public class SparkExecutableStageFunctionTest {
     ImmutableMap<String, FnDataReceiver<WindowedValue<?>>> inputReceiver =
         ImmutableMap.of("input", Mockito.mock(FnDataReceiver.class));
     when(remoteBundle.getInputReceivers()).thenReturn(inputReceiver);
-    when(metricsAccumulator.localValue()).thenReturn(stepMap);
+    when(metricsAccumulator.value()).thenReturn(stepMap);
     when(stepMap.getContainer(any())).thenReturn(container);
   }
 

Reply via email to