This is an automated email from the ASF dual-hosted git repository. iemejia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 8821ed8 [BEAM-4552] Use Spark AccumulatorsV2 for metrics new 36b839e Merge pull request #8387: [BEAM-4552] Use Spark AccumulatorsV2 API 8821ed8 is described below commit 8821ed8c3f6b5f4d16abf98d17910cc4a9ba8720 Author: Alexey Romanenko <aromanenko....@gmail.com> AuthorDate: Tue Apr 23 18:27:33 2019 +0200 [BEAM-4552] Use Spark AccumulatorsV2 for metrics --- .../runners/spark/aggregators/AggAccumParam.java | 39 ------------- .../spark/aggregators/AggregatorsAccumulator.java | 19 ++++--- .../aggregators/NamedAggregatorsAccumulator.java | 63 +++++++++++++++++++++ .../apache/beam/runners/spark/io/SourceRDD.java | 7 +-- .../runners/spark/io/SparkUnboundedSource.java | 4 +- .../runners/spark/metrics/MetricsAccumulator.java | 23 ++++---- .../spark/metrics/MetricsAccumulatorParam.java | 42 -------------- .../MetricsContainerStepMapAccumulator.java | 65 ++++++++++++++++++++++ .../spark/translation/DoFnRunnerWithMetrics.java | 9 ++- .../spark/translation/MultiDoFnFunction.java | 10 ++-- .../translation/SparkExecutableStageFunction.java | 11 ++-- .../SparkGroupAlsoByWindowViaOutputBufferFn.java | 5 +- .../spark/translation/TransformTranslator.java | 9 ++- .../streaming/StreamingTransformTranslator.java | 5 +- .../SparkExecutableStageFunctionTest.java | 6 +- 15 files changed, 180 insertions(+), 137 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggAccumParam.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggAccumParam.java deleted file mode 100644 index 6f80ce2..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggAccumParam.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.spark.aggregators; - -import org.apache.spark.AccumulatorParam; - -/** Aggregator accumulator param. */ -class AggAccumParam implements AccumulatorParam<NamedAggregators> { - - @Override - public NamedAggregators addAccumulator(NamedAggregators current, NamedAggregators added) { - return current.merge(added); - } - - @Override - public NamedAggregators addInPlace(NamedAggregators current, NamedAggregators added) { - return addAccumulator(current, added); - } - - @Override - public NamedAggregators zero(NamedAggregators initialValue) { - return new NamedAggregators(); - } -} diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java index e452079..4722b43 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java @@ -25,18 +25,18 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleF import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Optional; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.spark.Accumulator; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.streaming.api.java.JavaStreamingListener; import org.apache.spark.streaming.api.java.JavaStreamingListenerBatchCompleted; +import org.apache.spark.util.AccumulatorV2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * For resilience, {@link Accumulator Accumulators} are required to be wrapped in a Singleton. + * For resilience, {@link AccumulatorV2 Accumulators} are required to be wrapped in a Singleton. * * @see <a - * href="https://spark.apache.org/docs/1.6.3/streaming-programming-guide.html#accumulators-and-broadcast-variables">accumulators</a> + * href="https://spark.apache.org/docs/latest/api/java/org/apache/spark/util/AccumulatorV2.html">accumulatorsV2</a> */ public class AggregatorsAccumulator { private static final Logger LOG = LoggerFactory.getLogger(AggregatorsAccumulator.class); @@ -44,7 +44,7 @@ public class AggregatorsAccumulator { private static final String ACCUMULATOR_NAME = "Beam.Aggregators"; private static final String ACCUMULATOR_CHECKPOINT_FILENAME = "aggregators"; - private static volatile Accumulator<NamedAggregators> instance = null; + private static volatile NamedAggregatorsAccumulator instance = null; private static volatile FileSystem fileSystem; private static volatile Path checkpointFilePath; @@ -57,13 +57,16 @@ public class AggregatorsAccumulator { opts.isStreaming() ? Optional.of(new CheckpointDir(opts.getCheckpointDir())) : Optional.absent(); - Accumulator<NamedAggregators> accumulator = - jsc.sc().accumulator(new NamedAggregators(), ACCUMULATOR_NAME, new AggAccumParam()); + NamedAggregators namedAggregators = new NamedAggregators(); + NamedAggregatorsAccumulator accumulator = + new NamedAggregatorsAccumulator(namedAggregators); + jsc.sc().register(accumulator, ACCUMULATOR_NAME); + if (maybeCheckpointDir.isPresent()) { Optional<NamedAggregators> maybeRecoveredValue = recoverValueFromCheckpoint(jsc, maybeCheckpointDir.get()); if (maybeRecoveredValue.isPresent()) { - accumulator.setValue(maybeRecoveredValue.get()); + accumulator = new NamedAggregatorsAccumulator(maybeRecoveredValue.get()); } } instance = accumulator; @@ -73,7 +76,7 @@ public class AggregatorsAccumulator { } } - public static Accumulator<NamedAggregators> getInstance() { + public static NamedAggregatorsAccumulator getInstance() { if (instance == null) { throw new IllegalStateException("Aggregrators accumulator has not been instantiated"); } else { diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregatorsAccumulator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregatorsAccumulator.java new file mode 100644 index 0000000..5775814 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregatorsAccumulator.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.aggregators; + +import org.apache.spark.util.AccumulatorV2; + +/** {@link AccumulatorV2} implementation for {@link NamedAggregators}. */ +public class NamedAggregatorsAccumulator extends AccumulatorV2<NamedAggregators, NamedAggregators> { + private static final NamedAggregators empty = new NamedAggregators(); + + private NamedAggregators value; + + public NamedAggregatorsAccumulator(NamedAggregators value) { + this.value = value; + } + + @Override + public boolean isZero() { + return value.equals(empty); + } + + @Override + public NamedAggregatorsAccumulator copy() { + NamedAggregators newContainer = new NamedAggregators(); + newContainer.merge(value); + return new NamedAggregatorsAccumulator(newContainer); + } + + @Override + public void reset() { + this.value = new NamedAggregators(); + } + + @Override + public void add(NamedAggregators other) { + this.value.merge(other); + } + + @Override + public void merge(AccumulatorV2<NamedAggregators, NamedAggregators> other) { + this.value.merge(other.value()); + } + + @Override + public NamedAggregators value() { + return this.value; + } +} diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java index a264e49..007e80b 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java @@ -27,9 +27,9 @@ import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; import org.apache.beam.runners.core.construction.SerializablePipelineOptions; -import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.metrics.MetricsAccumulator; +import org.apache.beam.runners.spark.metrics.MetricsContainerStepMapAccumulator; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.io.UnboundedSource; @@ -37,7 +37,6 @@ import org.apache.beam.sdk.metrics.MetricsContainer; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting; -import org.apache.spark.Accumulator; import org.apache.spark.Dependency; import org.apache.spark.HashPartitioner; import org.apache.spark.InterruptibleIterator; @@ -67,7 +66,7 @@ public class SourceRDD { private final int numPartitions; private final long bundleSize; private final String stepName; - private final Accumulator<MetricsContainerStepMap> metricsAccum; + private final MetricsContainerStepMapAccumulator metricsAccum; // to satisfy Scala API. private static final scala.collection.immutable.Seq<Dependency<?>> NIL = @@ -136,7 +135,7 @@ public class SourceRDD { @Override public scala.collection.Iterator<WindowedValue<T>> compute( final Partition split, final TaskContext context) { - final MetricsContainer metricsContainer = metricsAccum.localValue().getContainer(stepName); + final MetricsContainer metricsContainer = metricsAccum.value().getContainer(stepName); @SuppressWarnings("unchecked") final BoundedSource.BoundedReader<T> reader = createReader((SourcePartition<T>) split); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java index 75e4114..8235ee6 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java @@ -27,6 +27,7 @@ import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.runners.spark.metrics.MetricsAccumulator; +import org.apache.beam.runners.spark.metrics.MetricsContainerStepMapAccumulator; import org.apache.beam.runners.spark.stateful.StateSpecFunctions; import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset; import org.apache.beam.runners.spark.util.GlobalWatermarkHolder; @@ -42,7 +43,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Splitter; -import org.apache.spark.Accumulator; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext$; import org.apache.spark.api.java.function.FlatMapFunction; @@ -191,7 +191,7 @@ public class SparkUnboundedSource { public scala.Option<RDD<BoxedUnit>> compute(Time validTime) { // compute parent. scala.Option<RDD<Metadata>> parentRDDOpt = parent.getOrCompute(validTime); - final Accumulator<MetricsContainerStepMap> metricsAccum = MetricsAccumulator.getInstance(); + final MetricsContainerStepMapAccumulator metricsAccum = MetricsAccumulator.getInstance(); long count = 0; SparkWatermarks sparkWatermark = null; Instant globalLowWatermarkForBatch = BoundedWindow.TIMESTAMP_MIN_VALUE; diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java index f511ffa..f21b1f6 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java @@ -26,18 +26,18 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleF import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Optional; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.spark.Accumulator; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.streaming.api.java.JavaStreamingListener; import org.apache.spark.streaming.api.java.JavaStreamingListenerBatchCompleted; +import org.apache.spark.util.AccumulatorV2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * For resilience, {@link Accumulator Accumulators} are required to be wrapped in a Singleton. + * For resilience, {@link AccumulatorV2 Accumulators} are required to be wrapped in a Singleton. * * @see <a - * href="https://spark.apache.org/docs/1.6.3/streaming-programming-guide.html#accumulators-and-broadcast-variables">accumulators</a> + * href="https://spark.apache.org/docs/latest/api/java/org/apache/spark/util/AccumulatorV2.html">accumulatorsV2</a> */ public class MetricsAccumulator { private static final Logger LOG = LoggerFactory.getLogger(MetricsAccumulator.class); @@ -45,7 +45,7 @@ public class MetricsAccumulator { private static final String ACCUMULATOR_NAME = "Beam.Metrics"; private static final String ACCUMULATOR_CHECKPOINT_FILENAME = "metrics"; - private static volatile Accumulator<MetricsContainerStepMap> instance = null; + private static volatile MetricsContainerStepMapAccumulator instance = null; private static volatile FileSystem fileSystem; private static volatile Path checkpointFilePath; @@ -58,17 +58,16 @@ public class MetricsAccumulator { opts.isStreaming() ? Optional.of(new CheckpointDir(opts.getCheckpointDir())) : Optional.absent(); - Accumulator<MetricsContainerStepMap> accumulator = - jsc.sc() - .accumulator( - new SparkMetricsContainerStepMap(), - ACCUMULATOR_NAME, - new MetricsAccumulatorParam()); + MetricsContainerStepMap metricsContainerStepMap = new MetricsContainerStepMap(); + MetricsContainerStepMapAccumulator accumulator = + new MetricsContainerStepMapAccumulator(metricsContainerStepMap); + jsc.sc().register(accumulator, ACCUMULATOR_NAME); + if (maybeCheckpointDir.isPresent()) { Optional<MetricsContainerStepMap> maybeRecoveredValue = recoverValueFromCheckpoint(jsc, maybeCheckpointDir.get()); if (maybeRecoveredValue.isPresent()) { - accumulator.setValue(maybeRecoveredValue.get()); + accumulator = new MetricsContainerStepMapAccumulator(maybeRecoveredValue.get()); } } instance = accumulator; @@ -78,7 +77,7 @@ public class MetricsAccumulator { } } - public static Accumulator<MetricsContainerStepMap> getInstance() { + public static MetricsContainerStepMapAccumulator getInstance() { if (instance == null) { throw new IllegalStateException("Metrics accumulator has not been instantiated"); } else { diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java deleted file mode 100644 index 9303935..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulatorParam.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.spark.metrics; - -import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; -import org.apache.spark.AccumulatorParam; - -/** Metrics accumulator param. */ -class MetricsAccumulatorParam implements AccumulatorParam<MetricsContainerStepMap> { - @Override - public MetricsContainerStepMap addAccumulator( - MetricsContainerStepMap c1, MetricsContainerStepMap c2) { - return addInPlace(c1, c2); - } - - @Override - public MetricsContainerStepMap addInPlace( - MetricsContainerStepMap c1, MetricsContainerStepMap c2) { - c1.updateAll(c2); - return c1; - } - - @Override - public MetricsContainerStepMap zero(MetricsContainerStepMap initialValue) { - return new MetricsContainerStepMap(); - } -} diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsContainerStepMapAccumulator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsContainerStepMapAccumulator.java new file mode 100644 index 0000000..61de79f --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsContainerStepMapAccumulator.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.metrics; + +import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; +import org.apache.spark.util.AccumulatorV2; + +/** {@link AccumulatorV2} implementation for {@link MetricsContainerStepMap}. */ +public class MetricsContainerStepMapAccumulator + extends AccumulatorV2<MetricsContainerStepMap, MetricsContainerStepMap> { + private static final MetricsContainerStepMap empty = new MetricsContainerStepMap(); + + private MetricsContainerStepMap value; + + public MetricsContainerStepMapAccumulator(MetricsContainerStepMap value) { + this.value = value; + } + + @Override + public boolean isZero() { + return value.equals(empty); + } + + @Override + public MetricsContainerStepMapAccumulator copy() { + MetricsContainerStepMap newContainer = new MetricsContainerStepMap(); + newContainer.updateAll(value); + return new MetricsContainerStepMapAccumulator(newContainer); + } + + @Override + public void reset() { + this.value = new MetricsContainerStepMap(); + } + + @Override + public void add(MetricsContainerStepMap other) { + this.value.updateAll(other); + } + + @Override + public void merge(AccumulatorV2<MetricsContainerStepMap, MetricsContainerStepMap> other) { + this.value.updateAll(other.value()); + } + + @Override + public MetricsContainerStepMap value() { + return this.value; + } +} diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java index 25f88aa..845dc63 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java @@ -21,26 +21,25 @@ import java.io.Closeable; import java.io.IOException; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; -import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; +import org.apache.beam.runners.spark.metrics.MetricsContainerStepMapAccumulator; import org.apache.beam.sdk.metrics.MetricsContainer; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.spark.Accumulator; import org.joda.time.Instant; /** DoFnRunner decorator which registers {@link MetricsContainerImpl}. */ class DoFnRunnerWithMetrics<InputT, OutputT> implements DoFnRunner<InputT, OutputT> { private final DoFnRunner<InputT, OutputT> delegate; private final String stepName; - private final Accumulator<MetricsContainerStepMap> metricsAccum; + private final MetricsContainerStepMapAccumulator metricsAccum; DoFnRunnerWithMetrics( String stepName, DoFnRunner<InputT, OutputT> delegate, - Accumulator<MetricsContainerStepMap> metricsAccum) { + MetricsContainerStepMapAccumulator metricsAccum) { this.delegate = delegate; this.stepName = stepName; this.metricsAccum = metricsAccum; @@ -92,6 +91,6 @@ class DoFnRunnerWithMetrics<InputT, OutputT> implements DoFnRunner<InputT, Outpu } private MetricsContainer metricsContainer() { - return metricsAccum.localValue().getContainer(stepName); + return metricsAccum.value().getContainer(stepName); } } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java index 566edd1..badca38 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java @@ -30,7 +30,7 @@ import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StepContext; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.construction.SerializablePipelineOptions; -import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; +import org.apache.beam.runners.spark.metrics.MetricsContainerStepMapAccumulator; import org.apache.beam.runners.spark.util.CachedSideInputReader; import org.apache.beam.runners.spark.util.SideInputBroadcast; import org.apache.beam.runners.spark.util.SparkSideInputReader; @@ -48,8 +48,8 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Function; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterators; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.LinkedListMultimap; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Multimap; -import org.apache.spark.Accumulator; import org.apache.spark.api.java.function.PairFlatMapFunction; +import org.apache.spark.util.AccumulatorV2; import scala.Tuple2; /** @@ -62,7 +62,7 @@ import scala.Tuple2; public class MultiDoFnFunction<InputT, OutputT> implements PairFlatMapFunction<Iterator<WindowedValue<InputT>>, TupleTag<?>, WindowedValue<?>> { - private final Accumulator<MetricsContainerStepMap> metricsAccum; + private final MetricsContainerStepMapAccumulator metricsAccum; private final String stepName; private final DoFn<InputT, OutputT> doFn; private transient boolean wasSetupCalled; @@ -77,7 +77,7 @@ public class MultiDoFnFunction<InputT, OutputT> private final DoFnSchemaInformation doFnSchemaInformation; /** - * @param metricsAccum The Spark {@link Accumulator} that backs the Beam metrics. + * @param metricsAccum The Spark {@link AccumulatorV2} that backs the Beam metrics. * @param doFn The {@link DoFn} to be wrapped. * @param options The {@link SerializablePipelineOptions}. * @param mainOutputTag The main output {@link TupleTag}. @@ -89,7 +89,7 @@ public class MultiDoFnFunction<InputT, OutputT> * @param stateful Stateful {@link DoFn}. */ public MultiDoFnFunction( - Accumulator<MetricsContainerStepMap> metricsAccum, + MetricsContainerStepMapAccumulator metricsAccum, String stepName, DoFn<InputT, OutputT> doFn, SerializablePipelineOptions options, diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java index 231714b..d49300e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java @@ -33,7 +33,6 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey.TypeCase; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.graph.ExecutableStage; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; -import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; import org.apache.beam.runners.fnexecution.control.BundleProgressHandler; import org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory; import org.apache.beam.runners.fnexecution.control.JobBundleFactory; @@ -46,11 +45,11 @@ import org.apache.beam.runners.fnexecution.state.StateRequestHandler; import org.apache.beam.runners.fnexecution.state.StateRequestHandlers; import org.apache.beam.runners.fnexecution.translation.BatchSideInputHandlerFactory; import org.apache.beam.runners.spark.coders.CoderHelpers; +import org.apache.beam.runners.spark.metrics.MetricsContainerStepMapAccumulator; import org.apache.beam.sdk.fn.data.FnDataReceiver; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; -import org.apache.spark.Accumulator; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.broadcast.Broadcast; import org.slf4j.Logger; @@ -77,14 +76,14 @@ class SparkExecutableStageFunction<InputT, SideInputT> // map from pCollection id to tuple of serialized bytes and coder to decode the bytes private final Map<String, Tuple2<Broadcast<List<byte[]>>, WindowedValueCoder<SideInputT>>> sideInputs; - private final Accumulator<MetricsContainerStepMap> metricsAccumulator; + private final MetricsContainerStepMapAccumulator metricsAccumulator; SparkExecutableStageFunction( RunnerApi.ExecutableStagePayload stagePayload, JobInfo jobInfo, Map<String, Integer> outputMap, Map<String, Tuple2<Broadcast<List<byte[]>>, WindowedValueCoder<SideInputT>>> sideInputs, - Accumulator<MetricsContainerStepMap> metricsAccumulator) { + MetricsContainerStepMapAccumulator metricsAccumulator) { this( stagePayload, outputMap, @@ -98,7 +97,7 @@ class SparkExecutableStageFunction<InputT, SideInputT> Map<String, Integer> outputMap, JobBundleFactoryCreator jobBundleFactoryCreator, Map<String, Tuple2<Broadcast<List<byte[]>>, WindowedValueCoder<SideInputT>>> sideInputs, - Accumulator<MetricsContainerStepMap> metricsAccumulator) { + MetricsContainerStepMapAccumulator metricsAccumulator) { this.stagePayload = stagePayload; this.outputMap = outputMap; this.jobBundleFactoryCreator = jobBundleFactoryCreator; @@ -116,7 +115,7 @@ class SparkExecutableStageFunction<InputT, SideInputT> StateRequestHandler stateRequestHandler = getStateRequestHandler(executableStage, stageBundleFactory.getProcessBundleDescriptor()); String stageName = stagePayload.getInput(); - MetricsContainerImpl container = metricsAccumulator.localValue().getContainer(stageName); + MetricsContainerImpl container = metricsAccumulator.value().getContainer(stageName); BundleProgressHandler bundleProgressHandler = new BundleProgressHandler() { @Override diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java index 158f303..f3b2600 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java @@ -34,14 +34,13 @@ import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.core.construction.TriggerTranslation; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; -import org.apache.beam.runners.spark.aggregators.NamedAggregators; +import org.apache.beam.runners.spark.aggregators.NamedAggregatorsAccumulator; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; -import org.apache.spark.Accumulator; import org.apache.spark.api.java.function.FlatMapFunction; import org.joda.time.Instant; @@ -61,7 +60,7 @@ class SparkGroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends BoundedWindow StateInternalsFactory<K> stateInternalsFactory, SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, W> reduceFn, SerializablePipelineOptions options, - Accumulator<NamedAggregators> accumulator) { + NamedAggregatorsAccumulator accumulator) { this.windowingStrategy = windowingStrategy; this.stateInternalsFactory = stateInternalsFactory; this.reduceFn = reduceFn; diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index 159d5a3..8cb79ad 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -28,13 +28,13 @@ import javax.annotation.Nullable; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.core.construction.ParDoTranslation; -import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator; -import org.apache.beam.runners.spark.aggregators.NamedAggregators; +import org.apache.beam.runners.spark.aggregators.NamedAggregatorsAccumulator; import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.runners.spark.io.SourceRDD; import org.apache.beam.runners.spark.metrics.MetricsAccumulator; +import org.apache.beam.runners.spark.metrics.MetricsContainerStepMapAccumulator; import org.apache.beam.runners.spark.util.SideInputBroadcast; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; @@ -68,7 +68,6 @@ import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Optional; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.FluentIterable; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists; -import org.apache.spark.Accumulator; import org.apache.spark.HashPartitioner; import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaPairRDD; @@ -122,7 +121,7 @@ public final class TransformTranslator { JavaRDD<WindowedValue<KV<K, V>>> inRDD = ((BoundedDataset<KV<K, V>>) context.borrowDataset(transform)).getRDD(); final KvCoder<K, V> coder = (KvCoder<K, V>) context.getInput(transform).getCoder(); - final Accumulator<NamedAggregators> accum = AggregatorsAccumulator.getInstance(); + final NamedAggregatorsAccumulator accum = AggregatorsAccumulator.getInstance(); @SuppressWarnings("unchecked") final WindowingStrategy<?, W> windowingStrategy = (WindowingStrategy<?, W>) context.getInput(transform).getWindowingStrategy(); @@ -351,7 +350,7 @@ public final class TransformTranslator { ((BoundedDataset<InputT>) context.borrowDataset(transform)).getRDD(); WindowingStrategy<?, ?> windowingStrategy = context.getInput(transform).getWindowingStrategy(); - Accumulator<MetricsContainerStepMap> metricsAccum = MetricsAccumulator.getInstance(); + MetricsContainerStepMapAccumulator metricsAccum = MetricsAccumulator.getInstance(); Coder<InputT> inputCoder = (Coder<InputT>) context.getInput(transform).getCoder(); Map<TupleTag<?>, Coder<?>> outputCoders = context.getOutputCoders(); JavaPairRDD<TupleTag<?>, WindowedValue<?>> all; diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index 8efc477..1e6c50c 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -36,12 +36,12 @@ import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.core.construction.ParDoTranslation; import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar; -import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.runners.spark.io.ConsoleIO; import org.apache.beam.runners.spark.io.CreateStream; import org.apache.beam.runners.spark.io.SparkUnboundedSource; import org.apache.beam.runners.spark.metrics.MetricsAccumulator; +import org.apache.beam.runners.spark.metrics.MetricsContainerStepMapAccumulator; import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet; import org.apache.beam.runners.spark.translation.BoundedDataset; import org.apache.beam.runners.spark.translation.Dataset; @@ -83,7 +83,6 @@ import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; -import org.apache.spark.Accumulator; import org.apache.spark.HashPartitioner; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -412,7 +411,7 @@ public final class StreamingTransformTranslator { JavaPairDStream<TupleTag<?>, WindowedValue<?>> all = dStream.transformToPair( rdd -> { - final Accumulator<MetricsContainerStepMap> metricsAccum = + final MetricsContainerStepMapAccumulator metricsAccum = MetricsAccumulator.getInstance(); final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs = diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java index 56ca69a..64657ae 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java @@ -43,12 +43,12 @@ import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors; import org.apache.beam.runners.fnexecution.control.RemoteBundle; import org.apache.beam.runners.fnexecution.control.StageBundleFactory; import org.apache.beam.runners.fnexecution.state.StateRequestHandler; +import org.apache.beam.runners.spark.metrics.MetricsContainerStepMapAccumulator; import org.apache.beam.runners.spark.translation.SparkExecutableStageFunction.JobBundleFactoryCreator; import org.apache.beam.sdk.fn.data.FnDataReceiver; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; -import org.apache.spark.Accumulator; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; @@ -61,7 +61,7 @@ public class SparkExecutableStageFunctionTest { @Mock private JobBundleFactory jobBundleFactory; @Mock private StageBundleFactory stageBundleFactory; @Mock private RemoteBundle remoteBundle; - @Mock private Accumulator<MetricsContainerStepMap> metricsAccumulator; + @Mock private MetricsContainerStepMapAccumulator metricsAccumulator; @Mock private MetricsContainerStepMap stepMap; @Mock private MetricsContainerImpl container; @@ -91,7 +91,7 @@ public class SparkExecutableStageFunctionTest { ImmutableMap<String, FnDataReceiver<WindowedValue<?>>> inputReceiver = ImmutableMap.of("input", Mockito.mock(FnDataReceiver.class)); when(remoteBundle.getInputReceivers()).thenReturn(inputReceiver); - when(metricsAccumulator.localValue()).thenReturn(stepMap); + when(metricsAccumulator.value()).thenReturn(stepMap); when(stepMap.getContainer(any())).thenReturn(container); }