Repository: incubator-beam Updated Branches: refs/heads/master 711c68092 -> 0c875ba70
[BEAM-918] Allow users to define the storage level via pipeline options Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d99829dd Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d99829dd Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d99829dd Branch: refs/heads/master Commit: d99829dd99db4090ceb7e5eefce50ee513c5458e Parents: 711c680 Author: Jean-Baptiste Onofré <jbono...@apache.org> Authored: Thu Nov 17 12:38:00 2016 +0100 Committer: Jean-Baptiste Onofré <jbono...@apache.org> Committed: Thu Dec 1 11:38:25 2016 +0100 ---------------------------------------------------------------------- .../runners/spark/SparkPipelineOptions.java | 5 ++ .../spark/translation/BoundedDataset.java | 5 +- .../beam/runners/spark/translation/Dataset.java | 2 +- .../spark/translation/EvaluationContext.java | 10 +++- .../translation/StorageLevelPTransform.java | 43 +++++++++++++++ .../spark/translation/TransformTranslator.java | 27 ++++++++++ .../translation/streaming/UnboundedDataset.java | 13 ++++- .../spark/translation/StorageLevelTest.java | 56 ++++++++++++++++++++ 8 files changed, 155 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d99829dd/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java index 0fd790e..3f8b379 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java @@ -44,6 +44,11 @@ public interface SparkPipelineOptions Long getBatchIntervalMillis(); void setBatchIntervalMillis(Long batchInterval); + @Description("Batch default storage level") + @Default.String("MEMORY_ONLY") + String getStorageLevel(); + void setStorageLevel(String storageLevel); + @Description("Minimum time to spend on read, for each micro-batch.") @Default.Long(200) Long getMinReadTimeMillis(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d99829dd/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java index 774efb9..1cfb0e0 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java @@ -32,6 +32,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDDLike; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.storage.StorageLevel; /** * Holds an RDD or values for deferred conversion to an RDD if needed. PCollections are sometimes @@ -97,8 +98,8 @@ public class BoundedDataset<T> implements Dataset { } @Override - public void cache() { - rdd.cache(); + public void cache(String storageLevel) { + rdd.persist(StorageLevel.fromString(storageLevel)); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d99829dd/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/Dataset.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/Dataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/Dataset.java index 36b03fe..b5d550e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/Dataset.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/Dataset.java @@ -26,7 +26,7 @@ import java.io.Serializable; */ public interface Dataset extends Serializable { - void cache(); + void cache(String storageLevel); void action(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d99829dd/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java index 1183fbb..ae45609 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java @@ -27,6 +27,7 @@ import java.util.LinkedHashSet; import java.util.Map; import java.util.Set; import org.apache.beam.runners.spark.EvaluationResult; +import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton; import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset; import org.apache.beam.sdk.AggregatorRetrievalException; @@ -155,7 +156,7 @@ public class EvaluationContext implements EvaluationResult { leaves.remove(dataset); if (multiReads.contains(pvalue)) { // Ensure the RDD is marked as cached - dataset.cache(); + dataset.cache(storageLevel()); } else { multiReads.add(pvalue); } @@ -172,7 +173,8 @@ public class EvaluationContext implements EvaluationResult { */ public void computeOutputs() { for (Dataset dataset : leaves) { - dataset.cache(); // cache so that any subsequent get() is cheap. + // cache so that any subsequent get() is cheap. + dataset.cache(storageLevel()); dataset.action(); // force computation. } } @@ -295,4 +297,8 @@ public class EvaluationContext implements EvaluationResult { private boolean isStreamingPipeline() { return jssc != null; } + + private String storageLevel() { + return runtime.getPipelineOptions().as(SparkPipelineOptions.class).getStorageLevel(); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d99829dd/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java new file mode 100644 index 0000000..6944dbf --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.translation; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.PCollection; + +/** + * Get RDD storage level for the input PCollection (mostly used for testing purpose). + */ +public final class StorageLevelPTransform extends PTransform<PCollection<?>, PCollection<String>> { + + @Override + public PCollection<String> apply(PCollection<?> input) { + return PCollection.createPrimitiveOutputInternal(input.getPipeline(), + WindowingStrategy.globalDefault(), + PCollection.IsBounded.BOUNDED); + } + + @Override + public Coder getDefaultOutputCoder() { + return StringUtf8Coder.of(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d99829dd/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index 60d668e..66da181 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -26,6 +26,7 @@ import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceSh import com.google.common.collect.Maps; import java.io.IOException; +import java.util.Collections; import java.util.Map; import org.apache.avro.mapred.AvroKey; import org.apache.avro.mapreduce.AvroJob; @@ -34,6 +35,7 @@ import org.apache.beam.runners.core.AssignWindowsDoFn; import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton; import org.apache.beam.runners.spark.aggregators.NamedAggregators; +import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.runners.spark.io.SourceRDD; import org.apache.beam.runners.spark.io.hadoop.HadoopIO; import org.apache.beam.runners.spark.io.hadoop.ShardNameTemplateHelper; @@ -42,6 +44,7 @@ import org.apache.beam.runners.spark.io.hadoop.TemplatedTextOutputFormat; import org.apache.beam.runners.spark.util.BroadcastHelper; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.AvroIO; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.TextIO; @@ -78,6 +81,7 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; + import scala.Tuple2; @@ -583,6 +587,27 @@ public final class TransformTranslator { }; } + private static TransformEvaluator<StorageLevelPTransform> storageLevel() { + return new TransformEvaluator<StorageLevelPTransform>() { + @Override + public void evaluate(StorageLevelPTransform transform, EvaluationContext context) { + JavaRDD rdd = ((BoundedDataset) (context).borrowDataset(transform)).getRDD(); + JavaSparkContext javaSparkContext = context.getSparkContext(); + + WindowedValue.ValueOnlyWindowedValueCoder<String> windowCoder = + WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()); + JavaRDD output = + javaSparkContext.parallelize( + CoderHelpers.toByteArrays( + Collections.singletonList(rdd.getStorageLevel().description()), + StringUtf8Coder.of())) + .map(CoderHelpers.fromByteFunction(windowCoder)); + + context.putDataset(transform, new BoundedDataset<String>(output)); + } + }; + } + private static final Map<Class<? extends PTransform>, TransformEvaluator<?>> EVALUATORS = Maps .newHashMap(); @@ -602,6 +627,8 @@ public final class TransformTranslator { EVALUATORS.put(View.AsIterable.class, viewAsIter()); EVALUATORS.put(View.CreatePCollectionView.class, createPCollView()); EVALUATORS.put(Window.Bound.class, window()); + // mostly test evaluators + EVALUATORS.put(StorageLevelPTransform.class, storageLevel()); } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d99829dd/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java index 67adee2..d059c7e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java @@ -31,12 +31,17 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * DStream holder Can also crate a DStream from a supplied queue of values, but mainly for testing. */ public class UnboundedDataset<T> implements Dataset { + + private static final Logger LOG = LoggerFactory.getLogger(UnboundedDataset.class); + // only set if creating a DStream from a static collection @Nullable private transient JavaStreamingContext jssc; @@ -81,12 +86,18 @@ public class UnboundedDataset<T> implements Dataset { return dStream; } - @Override public void cache() { dStream.cache(); } @Override + public void cache(String storageLevel) { + // we "force" MEMORY storage level in streaming + LOG.warn("Provided StorageLevel ignored for stream, using default level"); + cache(); + } + + @Override public void action() { dStream.foreachRDD(new VoidFunction<JavaRDD<WindowedValue<T>>>() { @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d99829dd/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java new file mode 100644 index 0000000..48105e1 --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.translation; + +import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptions; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Rule; +import org.junit.Test; + +/** + * Test the RDD storage level defined by user. + */ +public class StorageLevelTest { + + @Rule + public final transient SparkTestPipelineOptions pipelineOptions = new SparkTestPipelineOptions(); + + @Test + public void test() throws Exception { + pipelineOptions.getOptions().setStorageLevel("DISK_ONLY"); + Pipeline p = Pipeline.create(pipelineOptions.getOptions()); + + PCollection<String> pCollection = p.apply(Create.of("foo")); + + // by default, the Spark runner doesn't cache the RDD if it accessed only one time. + // So, to "force" the caching of the RDD, we have to call the RDD at least two time. + // That's why we are using Count fn on the PCollection. + pCollection.apply(Count.<String>globally()); + + PCollection<String> output = pCollection.apply(new StorageLevelPTransform()); + + PAssert.thatSingleton(output).isEqualTo("Disk Serialized 1x Replicated"); + + p.run(); + } + +}