Repository: incubator-beam Updated Branches: refs/heads/master 9c300cde8 -> 2bc66f903
[BEAM-762] Unify spark-runner EvaluationContext and StreamingEvaluationContext PR 1291 review changes. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1bef01fe Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1bef01fe Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1bef01fe Branch: refs/heads/master Commit: 1bef01fef5ff5ff9a960c85b00c2cc4aa504ce4d Parents: 9c300cd Author: Aviem Zur <aviem...@gmail.com> Authored: Sun Nov 13 13:57:07 2016 +0200 Committer: Sela <ans...@paypal.com> Committed: Tue Nov 15 13:35:49 2016 +0200 ---------------------------------------------------------------------- .../apache/beam/runners/spark/SparkRunner.java | 4 +- .../spark/translation/BoundedDataset.java | 114 ++++++++ .../beam/runners/spark/translation/Dataset.java | 34 +++ .../spark/translation/EvaluationContext.java | 230 +++++++--------- .../spark/translation/TransformTranslator.java | 99 +++---- .../SparkRunnerStreamingContextFactory.java | 7 +- .../streaming/StreamingEvaluationContext.java | 272 ------------------- .../streaming/StreamingTransformTranslator.java | 135 +++++---- .../translation/streaming/UnboundedDataset.java | 103 +++++++ 9 files changed, 464 insertions(+), 534 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1bef01fe/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index 45c7f55..6bbef39 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -26,7 +26,6 @@ import org.apache.beam.runners.spark.translation.SparkPipelineTranslator; import org.apache.beam.runners.spark.translation.TransformEvaluator; import org.apache.beam.runners.spark.translation.TransformTranslator; import org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory; -import org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -49,6 +48,7 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + /** * The SparkRunner translate operations defined on a pipeline to a representation * executable by Spark, and then submitting the job to Spark to be executed. If we wanted to run @@ -136,7 +136,7 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> { jssc.start(); // if recovering from checkpoint, we have to reconstruct the EvaluationResult instance. - return contextFactory.getCtxt() == null ? new StreamingEvaluationContext(jssc.sc(), + return contextFactory.getCtxt() == null ? new EvaluationContext(jssc.sc(), pipeline, jssc, mOptions.getTimeout()) : contextFactory.getCtxt(); } else { if (mOptions.getTimeout() > 0) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1bef01fe/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java new file mode 100644 index 0000000..774efb9 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.translation; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.beam.runners.spark.coders.CoderHelpers; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaRDDLike; +import org.apache.spark.api.java.JavaSparkContext; + +/** + * Holds an RDD or values for deferred conversion to an RDD if needed. PCollections are sometimes + * created from a collection of objects (using RDD parallelize) and then only used to create View + * objects; in which case they do not need to be converted to bytes since they are not transferred + * across the network until they are broadcast. + */ +public class BoundedDataset<T> implements Dataset { + // only set if creating an RDD from a static collection + @Nullable private transient JavaSparkContext jsc; + + private Iterable<WindowedValue<T>> windowedValues; + private Coder<T> coder; + private JavaRDD<WindowedValue<T>> rdd; + + BoundedDataset(JavaRDD<WindowedValue<T>> rdd) { + this.rdd = rdd; + } + + BoundedDataset(Iterable<T> values, JavaSparkContext jsc, Coder<T> coder) { + this.windowedValues = + Iterables.transform(values, WindowingHelpers.<T>windowValueFunction()); + this.jsc = jsc; + this.coder = coder; + } + + @SuppressWarnings("ConstantConditions") + public JavaRDD<WindowedValue<T>> getRDD() { + if (rdd == null) { + WindowedValue.ValueOnlyWindowedValueCoder<T> windowCoder = + WindowedValue.getValueOnlyCoder(coder); + rdd = jsc.parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder)) + .map(CoderHelpers.fromByteFunction(windowCoder)); + } + return rdd; + } + + Iterable<WindowedValue<T>> getValues(PCollection<T> pcollection) { + if (windowedValues == null) { + WindowFn<?, ?> windowFn = + pcollection.getWindowingStrategy().getWindowFn(); + Coder<? extends BoundedWindow> windowCoder = windowFn.windowCoder(); + final WindowedValue.WindowedValueCoder<T> windowedValueCoder; + if (windowFn instanceof GlobalWindows) { + windowedValueCoder = + WindowedValue.ValueOnlyWindowedValueCoder.of(pcollection.getCoder()); + } else { + windowedValueCoder = + WindowedValue.FullWindowedValueCoder.of(pcollection.getCoder(), windowCoder); + } + JavaRDDLike<byte[], ?> bytesRDD = + rdd.map(CoderHelpers.toByteFunction(windowedValueCoder)); + List<byte[]> clientBytes = bytesRDD.collect(); + windowedValues = Iterables.transform(clientBytes, + new Function<byte[], WindowedValue<T>>() { + @Override + public WindowedValue<T> apply(byte[] bytes) { + return CoderHelpers.fromByteArray(bytes, windowedValueCoder); + } + }); + } + return windowedValues; + } + + @Override + public void cache() { + rdd.cache(); + } + + @Override + public void action() { + rdd.count(); + } + + @Override + public void setName(String name) { + rdd.setName(name); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1bef01fe/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/Dataset.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/Dataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/Dataset.java new file mode 100644 index 0000000..36b03fe --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/Dataset.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.translation; + +import java.io.Serializable; + + +/** + * Holder for Spark RDD/DStream. + */ +public interface Dataset extends Serializable { + + void cache(); + + void action(); + + void setName(String name); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1bef01fe/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java index 6ccec85..aaf7573 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java @@ -20,17 +20,15 @@ package org.apache.beam.runners.spark.translation; import static com.google.common.base.Preconditions.checkArgument; -import com.google.common.base.Function; import com.google.common.collect.Iterables; import java.io.IOException; import java.util.LinkedHashMap; import java.util.LinkedHashSet; -import java.util.List; import java.util.Map; import java.util.Set; import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton; -import org.apache.beam.runners.spark.coders.CoderHelpers; +import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset; import org.apache.beam.sdk.AggregatorRetrievalException; import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.Pipeline; @@ -39,17 +37,15 @@ import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindows; -import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; -import org.apache.spark.api.java.JavaRDDLike; +import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.joda.time.Duration; @@ -58,91 +54,48 @@ import org.joda.time.Duration; */ public class EvaluationContext implements EvaluationResult { private final JavaSparkContext jsc; - private final Pipeline pipeline; + private JavaStreamingContext jssc; private final SparkRuntimeContext runtime; - private final Map<PValue, RDDHolder<?>> pcollections = new LinkedHashMap<>(); - private final Set<RDDHolder<?>> leafRdds = new LinkedHashSet<>(); - private final Set<PValue> multireads = new LinkedHashSet<>(); + private final Pipeline pipeline; + private long timeout; + private final Map<PValue, Dataset> datasets = new LinkedHashMap<>(); + private final Map<PValue, Dataset> pcollections = new LinkedHashMap<>(); + private final Set<Dataset> leaves = new LinkedHashSet<>(); + private final Set<PValue> multiReads = new LinkedHashSet<>(); private final Map<PValue, Object> pobjects = new LinkedHashMap<>(); private final Map<PValue, Iterable<? extends WindowedValue<?>>> pview = new LinkedHashMap<>(); - protected AppliedPTransform<?, ?, ?> currentTransform; + private AppliedPTransform<?, ?, ?> currentTransform; + private State state; public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline) { this.jsc = jsc; this.pipeline = pipeline; this.runtime = new SparkRuntimeContext(pipeline, jsc); + // A batch pipeline is blocking by nature + this.state = State.DONE; } - /** - * Holds an RDD or values for deferred conversion to an RDD if needed. PCollections are - * sometimes created from a collection of objects (using RDD parallelize) and then - * only used to create View objects; in which case they do not need to be - * converted to bytes since they are not transferred across the network until they are - * broadcast. - */ - private class RDDHolder<T> { - - private Iterable<WindowedValue<T>> windowedValues; - private Coder<T> coder; - private JavaRDDLike<WindowedValue<T>, ?> rdd; - - RDDHolder(Iterable<T> values, Coder<T> coder) { - this.windowedValues = - Iterables.transform(values, WindowingHelpers.<T>windowValueFunction()); - this.coder = coder; - } - - RDDHolder(JavaRDDLike<WindowedValue<T>, ?> rdd) { - this.rdd = rdd; - } - - JavaRDDLike<WindowedValue<T>, ?> getRDD() { - if (rdd == null) { - WindowedValue.ValueOnlyWindowedValueCoder<T> windowCoder = - WindowedValue.getValueOnlyCoder(coder); - rdd = jsc.parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder)) - .map(CoderHelpers.fromByteFunction(windowCoder)); - } - return rdd; - } - - Iterable<WindowedValue<T>> getValues(PCollection<T> pcollection) { - if (windowedValues == null) { - WindowFn<?, ?> windowFn = - pcollection.getWindowingStrategy().getWindowFn(); - Coder<? extends BoundedWindow> windowCoder = windowFn.windowCoder(); - final WindowedValue.WindowedValueCoder<T> windowedValueCoder; - if (windowFn instanceof GlobalWindows) { - windowedValueCoder = - WindowedValue.ValueOnlyWindowedValueCoder.of(pcollection.getCoder()); - } else { - windowedValueCoder = - WindowedValue.FullWindowedValueCoder.of(pcollection.getCoder(), windowCoder); - } - JavaRDDLike<byte[], ?> bytesRDD = - rdd.map(CoderHelpers.toByteFunction(windowedValueCoder)); - List<byte[]> clientBytes = bytesRDD.collect(); - windowedValues = Iterables.transform(clientBytes, - new Function<byte[], WindowedValue<T>>() { - @Override - public WindowedValue<T> apply(byte[] bytes) { - return CoderHelpers.fromByteArray(bytes, windowedValueCoder); - } - }); - } - return windowedValues; - } + public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline, + JavaStreamingContext jssc, long timeout) { + this(jsc, pipeline); + this.jssc = jssc; + this.timeout = timeout; + this.state = State.RUNNING; } - protected JavaSparkContext getSparkContext() { + JavaSparkContext getSparkContext() { return jsc; } + public JavaStreamingContext getStreamingContext() { + return jssc; + } + public Pipeline getPipeline() { return pipeline; } - protected SparkRuntimeContext getRuntimeContext() { + public SparkRuntimeContext getRuntimeContext() { return runtime; } @@ -150,11 +103,7 @@ public class EvaluationContext implements EvaluationResult { this.currentTransform = transform; } - protected AppliedPTransform<?, ?, ?> getCurrentTransform() { - return currentTransform; - } - - protected <T extends PInput> T getInput(PTransform<T, ?> transform) { + public <T extends PInput> T getInput(PTransform<T, ?> transform) { checkArgument(currentTransform != null && currentTransform.getTransform() == transform, "can only be called with current transform"); @SuppressWarnings("unchecked") @@ -162,7 +111,7 @@ public class EvaluationContext implements EvaluationResult { return input; } - protected <T extends POutput> T getOutput(PTransform<?, T> transform) { + public <T extends POutput> T getOutput(PTransform<?, T> transform) { checkArgument(currentTransform != null && currentTransform.getTransform() == transform, "can only be called with current transform"); @SuppressWarnings("unchecked") @@ -170,81 +119,74 @@ public class EvaluationContext implements EvaluationResult { return output; } - protected <T> void setOutputRDD(PTransform<?, ?> transform, - JavaRDDLike<WindowedValue<T>, ?> rdd) { - setRDD((PValue) getOutput(transform), rdd); + public void putDataset(PTransform<?, ?> transform, Dataset dataset) { + putDataset((PValue) getOutput(transform), dataset); + } + + public void putDataset(PValue pvalue, Dataset dataset) { + try { + dataset.setName(pvalue.getName()); + } catch (IllegalStateException e) { + // name not set, ignore + } + datasets.put(pvalue, dataset); + leaves.add(dataset); + } + + <T> void putBoundedDatasetFromValues(PTransform<?, ?> transform, Iterable<T> values, + Coder<T> coder) { + datasets.put((PValue) getOutput(transform), new BoundedDataset<>(values, jsc, coder)); } - protected <T> void setOutputRDDFromValues(PTransform<?, ?> transform, Iterable<T> values, - Coder<T> coder) { - pcollections.put((PValue) getOutput(transform), new RDDHolder<>(values, coder)); + public <T> void putUnboundedDatasetFromQueue( + PTransform<?, ?> transform, Iterable<Iterable<T>> values, Coder<T> coder) { + datasets.put((PValue) getOutput(transform), new UnboundedDataset<>(values, jssc, coder)); } - public void setPView(PValue view, Iterable<? extends WindowedValue<?>> value) { + void putPView(PValue view, Iterable<? extends WindowedValue<?>> value) { pview.put(view, value); } - protected boolean hasOutputRDD(PTransform<? extends PInput, ?> transform) { - PValue pvalue = (PValue) getOutput(transform); - return pcollections.containsKey(pvalue); + public Dataset borrowDataset(PTransform<?, ?> transform) { + return borrowDataset((PValue) getInput(transform)); } - public JavaRDDLike<?, ?> getRDD(PValue pvalue) { - RDDHolder<?> rddHolder = pcollections.get(pvalue); - JavaRDDLike<?, ?> rdd = rddHolder.getRDD(); - leafRdds.remove(rddHolder); - if (multireads.contains(pvalue)) { + public Dataset borrowDataset(PValue pvalue) { + Dataset dataset = datasets.get(pvalue); + leaves.remove(dataset); + if (multiReads.contains(pvalue)) { // Ensure the RDD is marked as cached - rdd.rdd().cache(); + dataset.cache(); } else { - multireads.add(pvalue); + multiReads.add(pvalue); } - return rdd; + return dataset; } - protected <T> void setRDD(PValue pvalue, JavaRDDLike<WindowedValue<T>, ?> rdd) { - try { - rdd.rdd().setName(pvalue.getName()); - } catch (IllegalStateException e) { - // name not set, ignore - } - RDDHolder<T> rddHolder = new RDDHolder<>(rdd); - pcollections.put(pvalue, rddHolder); - leafRdds.add(rddHolder); - } - - protected JavaRDDLike<?, ?> getInputRDD(PTransform<? extends PInput, ?> transform) { - return getRDD((PValue) getInput(transform)); - } - - <T> Iterable<? extends WindowedValue<?>> getPCollectionView(PCollectionView<T> view) { return pview.get(view); } /** - * Computes the outputs for all RDDs that are leaves in the DAG and do not have any - * actions (like saving to a file) registered on them (i.e. they are performed for side - * effects). + * Computes the outputs for all RDDs that are leaves in the DAG and do not have any actions (like + * saving to a file) registered on them (i.e. they are performed for side effects). */ public void computeOutputs() { - for (RDDHolder<?> rddHolder : leafRdds) { - JavaRDDLike<?, ?> rdd = rddHolder.getRDD(); - rdd.rdd().cache(); // cache so that any subsequent get() is cheap - rdd.count(); // force the RDD to be computed + for (Dataset dataset : leaves) { + dataset.cache(); // cache so that any subsequent get() is cheap. + dataset.action(); // force computation. } } + @SuppressWarnings("unchecked") @Override public <T> T get(PValue value) { if (pobjects.containsKey(value)) { - @SuppressWarnings("unchecked") T result = (T) pobjects.get(value); return result; } if (pcollections.containsKey(value)) { - JavaRDDLike<?, ?> rdd = pcollections.get(value).getRDD(); - @SuppressWarnings("unchecked") + JavaRDD<?> rdd = ((BoundedDataset) pcollections.get(value)).getRDD(); T res = (T) Iterables.getOnlyElement(rdd.collect()); pobjects.put(value, res); return res; @@ -271,27 +213,37 @@ public class EvaluationContext implements EvaluationResult { @Override public <T> Iterable<T> get(PCollection<T> pcollection) { @SuppressWarnings("unchecked") - RDDHolder<T> rddHolder = (RDDHolder<T>) pcollections.get(pcollection); - Iterable<WindowedValue<T>> windowedValues = rddHolder.getValues(pcollection); + BoundedDataset<T> boundedDataset = (BoundedDataset<T>) datasets.get(pcollection); + Iterable<WindowedValue<T>> windowedValues = boundedDataset.getValues(pcollection); return Iterables.transform(windowedValues, WindowingHelpers.<T>unwindowValueFunction()); } <T> Iterable<WindowedValue<T>> getWindowedValues(PCollection<T> pcollection) { @SuppressWarnings("unchecked") - RDDHolder<T> rddHolder = (RDDHolder<T>) pcollections.get(pcollection); - return rddHolder.getValues(pcollection); + BoundedDataset<T> boundedDataset = (BoundedDataset<T>) datasets.get(pcollection); + return boundedDataset.getValues(pcollection); } @Override public void close(boolean gracefully) { - // graceful stop is used for streaming. + if (isStreamingPipeline()) { + // stop streaming context + if (timeout > 0) { + jssc.awaitTerminationOrTimeout(timeout); + } else { + jssc.awaitTermination(); + } + // stop streaming context gracefully, so checkpointing (and other computations) get to + // finish before shutdown. + jssc.stop(false, gracefully); + } + state = State.DONE; SparkContextFactory.stopSparkContext(jsc); } - /** The runner is blocking. */ @Override public State getState() { - return State.DONE; + return state; } @Override @@ -307,9 +259,19 @@ public class EvaluationContext implements EvaluationResult { @Override public State waitUntilFinish(Duration duration) { - // This is no-op, since Spark runner in batch is blocking. - // It needs to be updated once SparkRunner supports non-blocking execution: - // https://issues.apache.org/jira/browse/BEAM-595 - return State.DONE; + if (isStreamingPipeline()) { + throw new UnsupportedOperationException( + "Spark runner EvaluationContext does not support waitUntilFinish for streaming " + + "pipelines."); + } else { + // This is no-op, since Spark runner in batch is blocking. + // It needs to be updated once SparkRunner supports non-blocking execution: + // https://issues.apache.org/jira/browse/BEAM-595 + return State.DONE; + } + } + + private boolean isStreamingPipeline() { + return jssc != null; } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1bef01fe/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index 2e682c4..c902ee3 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.beam.runners.spark.translation; import static com.google.common.base.Preconditions.checkState; @@ -73,11 +72,9 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.spark.Accumulator; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaRDDLike; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; - import scala.Tuple2; @@ -101,11 +98,11 @@ public final class TransformTranslator { } else { JavaRDD<WindowedValue<T>>[] rdds = new JavaRDD[pcs.size()]; for (int i = 0; i < rdds.length; i++) { - rdds[i] = (JavaRDD<WindowedValue<T>>) context.getRDD(pcs.get(i)); + rdds[i] = ((BoundedDataset<T>) context.borrowDataset(pcs.get(i))).getRDD(); } unionRDD = context.getSparkContext().union(rdds); } - context.setOutputRDD(transform, unionRDD); + context.putDataset(transform, new BoundedDataset<>(unionRDD)); } }; } @@ -116,7 +113,7 @@ public final class TransformTranslator { public void evaluate(GroupByKey<K, V> transform, EvaluationContext context) { @SuppressWarnings("unchecked") JavaRDD<WindowedValue<KV<K, V>>> inRDD = - (JavaRDD<WindowedValue<KV<K, V>>>) context.getInputRDD(transform); + ((BoundedDataset<KV<K, V>>) context.borrowDataset(transform)).getRDD(); @SuppressWarnings("unchecked") final KvCoder<K, V> coder = (KvCoder<K, V>) context.getInput(transform).getCoder(); @@ -124,8 +121,9 @@ public final class TransformTranslator { final Accumulator<NamedAggregators> accum = AccumulatorSingleton.getInstance(context.getSparkContext()); - context.setOutputRDD(transform, GroupCombineFunctions.groupByKey(inRDD, accum, coder, - context.getRuntimeContext(), context.getInput(transform).getWindowingStrategy())); + context.putDataset(transform, + new BoundedDataset<>(GroupCombineFunctions.groupByKey(inRDD, accum, coder, + context.getRuntimeContext(), context.getInput(transform).getWindowingStrategy()))); } }; } @@ -146,16 +144,17 @@ public final class TransformTranslator { CombineFnUtil.toFnWithContext(transform.getFn()); @SuppressWarnings("unchecked") - JavaRDDLike<WindowedValue<KV<K, Iterable<InputT>>>, ?> inRDD = - (JavaRDDLike<WindowedValue<KV<K, Iterable<InputT>>>, ?>) - context.getInputRDD(transform); + JavaRDD<WindowedValue<KV<K, Iterable<InputT>>>> inRDD = + ((BoundedDataset<KV<K, Iterable<InputT>>>) + context.borrowDataset(transform)).getRDD(); - SparkKeyedCombineFn<K, InputT, ?, OutputT> combineFnWithContext = + SparkKeyedCombineFn<K, InputT, ?, OutputT> combineFnWithContext = new SparkKeyedCombineFn<>(fn, context.getRuntimeContext(), TranslationUtils.getSideInputs(transform.getSideInputs(), context), - windowingStrategy); - context.setOutputRDD(transform, inRDD.map(new TranslationUtils.CombineGroupedValues<>( - combineFnWithContext))); + windowingStrategy); + context.putDataset(transform, new BoundedDataset<>(inRDD.map(new TranslationUtils + .CombineGroupedValues<>( + combineFnWithContext)))); } }; } @@ -182,10 +181,11 @@ public final class TransformTranslator { @SuppressWarnings("unchecked") JavaRDD<WindowedValue<InputT>> inRdd = - (JavaRDD<WindowedValue<InputT>>) context.getInputRDD(transform); + ((BoundedDataset<InputT>) context.borrowDataset(transform)).getRDD(); - context.setOutputRDD(transform, GroupCombineFunctions.combineGlobally(inRdd, combineFn, - iCoder, oCoder, runtimeContext, windowingStrategy, sideInputs, hasDefault)); + context.putDataset(transform, new BoundedDataset<>(GroupCombineFunctions + .combineGlobally(inRdd, combineFn, + iCoder, oCoder, runtimeContext, windowingStrategy, sideInputs, hasDefault))); } }; } @@ -212,10 +212,11 @@ public final class TransformTranslator { @SuppressWarnings("unchecked") JavaRDD<WindowedValue<KV<K, InputT>>> inRdd = - (JavaRDD<WindowedValue<KV<K, InputT>>>) context.getInputRDD(transform); + ((BoundedDataset<KV<K, InputT>>) context.borrowDataset(transform)).getRDD(); - context.setOutputRDD(transform, GroupCombineFunctions.combinePerKey(inRdd, combineFn, - inputCoder, runtimeContext, windowingStrategy, sideInputs)); + context.putDataset(transform, new BoundedDataset<>(GroupCombineFunctions + .combinePerKey(inRdd, combineFn, + inputCoder, runtimeContext, windowingStrategy, sideInputs))); } }; } @@ -225,8 +226,8 @@ public final class TransformTranslator { @Override public void evaluate(ParDo.Bound<InputT, OutputT> transform, EvaluationContext context) { @SuppressWarnings("unchecked") - JavaRDDLike<WindowedValue<InputT>, ?> inRDD = - (JavaRDDLike<WindowedValue<InputT>, ?>) context.getInputRDD(transform); + JavaRDD<WindowedValue<InputT>> inRDD = + ((BoundedDataset<InputT>) context.borrowDataset(transform)).getRDD(); @SuppressWarnings("unchecked") final WindowFn<Object, ?> windowFn = (WindowFn<Object, ?>) context.getInput(transform).getWindowingStrategy().getWindowFn(); @@ -234,9 +235,9 @@ public final class TransformTranslator { AccumulatorSingleton.getInstance(context.getSparkContext()); Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs = TranslationUtils.getSideInputs(transform.getSideInputs(), context); - context.setOutputRDD(transform, - inRDD.mapPartitions(new DoFnFunction<>(accum, transform.getFn(), - context.getRuntimeContext(), sideInputs, windowFn))); + context.putDataset(transform, + new BoundedDataset<>(inRDD.mapPartitions(new DoFnFunction<>(accum, transform.getFn(), + context.getRuntimeContext(), sideInputs, windowFn)))); } }; } @@ -247,8 +248,8 @@ public final class TransformTranslator { @Override public void evaluate(ParDo.BoundMulti<InputT, OutputT> transform, EvaluationContext context) { @SuppressWarnings("unchecked") - JavaRDDLike<WindowedValue<InputT>, ?> inRDD = - (JavaRDDLike<WindowedValue<InputT>, ?>) context.getInputRDD(transform); + JavaRDD<WindowedValue<InputT>> inRDD = + ((BoundedDataset<InputT>) context.borrowDataset(transform)).getRDD(); @SuppressWarnings("unchecked") final WindowFn<Object, ?> windowFn = (WindowFn<Object, ?>) context.getInput(transform).getWindowingStrategy().getWindowFn(); @@ -268,7 +269,7 @@ public final class TransformTranslator { // Object is the best we can do since different outputs can have different tags JavaRDD<WindowedValue<Object>> values = (JavaRDD<WindowedValue<Object>>) (JavaRDD<?>) filtered.values(); - context.setRDD(e.getValue(), values); + context.putDataset(e.getValue(), new BoundedDataset<>(values)); } } }; @@ -281,8 +282,8 @@ public final class TransformTranslator { public void evaluate(TextIO.Read.Bound<T> transform, EvaluationContext context) { String pattern = transform.getFilepattern(); JavaRDD<WindowedValue<String>> rdd = context.getSparkContext().textFile(pattern) - .map(WindowingHelpers.<String>windowFunction()); - context.setOutputRDD(transform, rdd); + .map(WindowingHelpers.<String>windowFunction()); + context.putDataset(transform, new BoundedDataset<>(rdd)); } }; } @@ -293,7 +294,7 @@ public final class TransformTranslator { public void evaluate(TextIO.Write.Bound<T> transform, EvaluationContext context) { @SuppressWarnings("unchecked") JavaPairRDD<T, Void> last = - ((JavaRDDLike<WindowedValue<T>, ?>) context.getInputRDD(transform)) + ((BoundedDataset<T>) context.borrowDataset(transform)).getRDD() .map(WindowingHelpers.<T>unwindowFunction()) .mapToPair(new PairFunction<T, T, Void>() { @@ -331,7 +332,7 @@ public final class TransformTranslator { return key.datum(); } }).map(WindowingHelpers.<T>windowFunction()); - context.setOutputRDD(transform, rdd); + context.putDataset(transform, new BoundedDataset<>(rdd)); } }; } @@ -349,7 +350,7 @@ public final class TransformTranslator { AvroJob.setOutputKeySchema(job, transform.getSchema()); @SuppressWarnings("unchecked") JavaPairRDD<AvroKey<T>, NullWritable> last = - ((JavaRDDLike<WindowedValue<T>, ?>) context.getInputRDD(transform)) + ((BoundedDataset<T>) context.borrowDataset(transform)).getRDD() .map(WindowingHelpers.<T>unwindowFunction()) .mapToPair(new PairFunction<T, AvroKey<T>, NullWritable>() { @Override @@ -377,7 +378,7 @@ public final class TransformTranslator { JavaRDD<WindowedValue<T>> input = new SourceRDD.Bounded<>( jsc.sc(), transform.getSource(), runtimeContext).toJavaRDD(); // cache to avoid re-evaluation of the source by Spark's lazy DAG evaluation. - context.setOutputRDD(transform, input.cache()); + context.putDataset(transform, new BoundedDataset<>(input.cache())); } }; } @@ -388,7 +389,7 @@ public final class TransformTranslator { public void evaluate(HadoopIO.Read.Bound<K, V> transform, EvaluationContext context) { String pattern = transform.getFilepattern(); JavaSparkContext jsc = context.getSparkContext(); - @SuppressWarnings ("unchecked") + @SuppressWarnings("unchecked") JavaPairRDD<K, V> file = jsc.newAPIHadoopFile(pattern, transform.getFormatClass(), transform.getKeyClass(), transform.getValueClass(), @@ -400,7 +401,7 @@ public final class TransformTranslator { return KV.of(t2._1(), t2._2()); } }).map(WindowingHelpers.<KV<K, V>>windowFunction()); - context.setOutputRDD(transform, rdd); + context.putDataset(transform, new BoundedDataset<>(rdd)); } }; } @@ -410,8 +411,8 @@ public final class TransformTranslator { @Override public void evaluate(HadoopIO.Write.Bound<K, V> transform, EvaluationContext context) { @SuppressWarnings("unchecked") - JavaPairRDD<K, V> last = ((JavaRDDLike<WindowedValue<KV<K, V>>, ?>) context - .getInputRDD(transform)) + JavaPairRDD<K, V> last = ((BoundedDataset<KV<K, V>>) context.borrowDataset(transform)) + .getRDD() .map(WindowingHelpers.<KV<K, V>>unwindowFunction()) .mapToPair(new PairFunction<KV<K, V>, K, V>() { @Override @@ -492,20 +493,20 @@ public final class TransformTranslator { @Override public void evaluate(Window.Bound<T> transform, EvaluationContext context) { @SuppressWarnings("unchecked") - JavaRDDLike<WindowedValue<T>, ?> inRDD = - (JavaRDDLike<WindowedValue<T>, ?>) context.getInputRDD(transform); + JavaRDD<WindowedValue<T>> inRDD = + ((BoundedDataset<T>) context.borrowDataset(transform)).getRDD(); if (TranslationUtils.skipAssignWindows(transform, context)) { - context.setOutputRDD(transform, inRDD); + context.putDataset(transform, new BoundedDataset<>(inRDD)); } else { @SuppressWarnings("unchecked") WindowFn<? super T, W> windowFn = (WindowFn<? super T, W>) transform.getWindowFn(); OldDoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn); Accumulator<NamedAggregators> accum = AccumulatorSingleton.getInstance(context.getSparkContext()); - context.setOutputRDD(transform, - inRDD.mapPartitions(new DoFnFunction<>(accum, addWindowsDoFn, - context.getRuntimeContext(), null, null))); + context.putDataset(transform, + new BoundedDataset<>(inRDD.mapPartitions(new DoFnFunction<>(accum, addWindowsDoFn, + context.getRuntimeContext(), null, null)))); } } }; @@ -519,7 +520,7 @@ public final class TransformTranslator { // Use a coder to convert the objects in the PCollection to byte arrays, so they // can be transferred over the network. Coder<T> coder = context.getOutput(transform).getCoder(); - context.setOutputRDDFromValues(transform, elems, coder); + context.putBoundedDatasetFromValues(transform, elems, coder); } }; } @@ -530,7 +531,7 @@ public final class TransformTranslator { public void evaluate(View.AsSingleton<T> transform, EvaluationContext context) { Iterable<? extends WindowedValue<?>> iter = context.getWindowedValues(context.getInput(transform)); - context.setPView(context.getOutput(transform), iter); + context.putPView(context.getOutput(transform), iter); } }; } @@ -541,7 +542,7 @@ public final class TransformTranslator { public void evaluate(View.AsIterable<T> transform, EvaluationContext context) { Iterable<? extends WindowedValue<?>> iter = context.getWindowedValues(context.getInput(transform)); - context.setPView(context.getOutput(transform), iter); + context.putPView(context.getOutput(transform), iter); } }; } @@ -554,7 +555,7 @@ public final class TransformTranslator { EvaluationContext context) { Iterable<? extends WindowedValue<?>> iter = context.getWindowedValues(context.getInput(transform)); - context.setPView(context.getOutput(transform), iter); + context.putPView(context.getOutput(transform), iter); } }; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1bef01fe/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java index f8ee8ad..01398e4 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java @@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkArgument; import org.apache.beam.runners.spark.SparkContextOptions; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.SparkRunner; +import org.apache.beam.runners.spark.translation.EvaluationContext; import org.apache.beam.runners.spark.translation.SparkContextFactory; import org.apache.beam.runners.spark.translation.SparkPipelineTranslator; import org.apache.beam.runners.spark.translation.TransformTranslator; @@ -54,7 +55,7 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF this.options = options; } - private StreamingEvaluationContext ctxt; + private EvaluationContext ctxt; @Override public JavaStreamingContext create() { @@ -72,7 +73,7 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF JavaSparkContext jsc = SparkContextFactory.getSparkContext(options); JavaStreamingContext jssc = new JavaStreamingContext(jsc, batchDuration); - ctxt = new StreamingEvaluationContext(jsc, pipeline, jssc, + ctxt = new EvaluationContext(jsc, pipeline, jssc, options.getTimeout()); pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt)); ctxt.computeOutputs(); @@ -95,7 +96,7 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF return jssc; } - public StreamingEvaluationContext getCtxt() { + public EvaluationContext getCtxt() { return ctxt; } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1bef01fe/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java deleted file mode 100644 index bfba316..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java +++ /dev/null @@ -1,272 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.spark.translation.streaming; - - -import com.google.common.collect.Iterables; - -import java.io.IOException; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.LinkedBlockingQueue; -import org.apache.beam.runners.spark.coders.CoderHelpers; -import org.apache.beam.runners.spark.translation.EvaluationContext; -import org.apache.beam.runners.spark.translation.SparkRuntimeContext; -import org.apache.beam.runners.spark.translation.WindowingHelpers; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; -import org.apache.beam.sdk.values.PValue; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaRDDLike; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.VoidFunction; -import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.api.java.JavaDStreamLike; -import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.joda.time.Duration; - - -/** - * Streaming evaluation context helps to handle streaming. - */ -public class StreamingEvaluationContext extends EvaluationContext { - - private final JavaStreamingContext jssc; - private final long timeout; - private final Map<PValue, DStreamHolder<?>> pstreams = new LinkedHashMap<>(); - private final Set<DStreamHolder<?>> leafStreams = new LinkedHashSet<>(); - - public StreamingEvaluationContext(JavaSparkContext jsc, Pipeline pipeline, - JavaStreamingContext jssc, long timeout) { - super(jsc, pipeline); - this.jssc = jssc; - this.timeout = timeout; - } - - /** - * DStream holder Can also crate a DStream from a supplied queue of values, but mainly for - * testing. - */ - private class DStreamHolder<T> { - - private Iterable<Iterable<T>> values; - private Coder<T> coder; - private JavaDStream<WindowedValue<T>> dStream; - - DStreamHolder(Iterable<Iterable<T>> values, Coder<T> coder) { - this.values = values; - this.coder = coder; - } - - DStreamHolder(JavaDStream<WindowedValue<T>> dStream) { - this.dStream = dStream; - } - - @SuppressWarnings("unchecked") - JavaDStream<WindowedValue<T>> getDStream() { - if (dStream == null) { - WindowedValue.ValueOnlyWindowedValueCoder<T> windowCoder = - WindowedValue.getValueOnlyCoder(coder); - // create the DStream from queue - Queue<JavaRDD<WindowedValue<T>>> rddQueue = new LinkedBlockingQueue<>(); - JavaRDD<WindowedValue<T>> lastRDD = null; - for (Iterable<T> v : values) { - Iterable<WindowedValue<T>> windowedValues = - Iterables.transform(v, WindowingHelpers.<T>windowValueFunction()); - JavaRDD<WindowedValue<T>> rdd = getSparkContext().parallelize( - CoderHelpers.toByteArrays(windowedValues, windowCoder)).map( - CoderHelpers.fromByteFunction(windowCoder)); - rddQueue.offer(rdd); - lastRDD = rdd; - } - // create dstream from queue, one at a time, - // with last as default in case batches repeat (graceful stops for example). - // if the stream is empty, avoid creating a default empty RDD. - // mainly for unit test so no reason to have this configurable. - dStream = lastRDD != null ? jssc.queueStream(rddQueue, true, lastRDD) - : jssc.queueStream(rddQueue, true); - } - return dStream; - } - } - - <T> void setDStreamFromQueue( - PTransform<?, ?> transform, Iterable<Iterable<T>> values, Coder<T> coder) { - pstreams.put((PValue) getOutput(transform), new DStreamHolder<>(values, coder)); - } - - <T> void setStream(PTransform<?, ?> transform, JavaDStream<WindowedValue<T>> dStream) { - setStream((PValue) getOutput(transform), dStream); - } - - <T> void setStream(PValue pvalue, JavaDStream<WindowedValue<T>> dStream) { - DStreamHolder<T> dStreamHolder = new DStreamHolder<>(dStream); - pstreams.put(pvalue, dStreamHolder); - leafStreams.add(dStreamHolder); - } - - boolean hasStream(PTransform<?, ?> transform) { - PValue pvalue = (PValue) getInput(transform); - return hasStream(pvalue); - } - - boolean hasStream(PValue pvalue) { - return pstreams.containsKey(pvalue); - } - - JavaDStreamLike<?, ?, ?> getStream(PTransform<?, ?> transform) { - return getStream((PValue) getInput(transform)); - } - - JavaDStreamLike<?, ?, ?> getStream(PValue pvalue) { - DStreamHolder<?> dStreamHolder = pstreams.get(pvalue); - JavaDStreamLike<?, ?, ?> dStream = dStreamHolder.getDStream(); - leafStreams.remove(dStreamHolder); - return dStream; - } - - // used to set the RDD from the DStream in the RDDHolder for transformation - <T> void setInputRDD( - PTransform<? extends PInput, ?> transform, JavaRDDLike<WindowedValue<T>, ?> rdd) { - setRDD((PValue) getInput(transform), rdd); - } - - // used to get the RDD transformation output and use it as the DStream transformation output - JavaRDDLike<?, ?> getOutputRDD(PTransform<?, ?> transform) { - return getRDD((PValue) getOutput(transform)); - } - - public JavaStreamingContext getStreamingContext() { - return jssc; - } - - @Override - public void computeOutputs() { - super.computeOutputs(); // in case the pipeline contains bounded branches as well. - for (DStreamHolder<?> streamHolder : leafStreams) { - computeOutput(streamHolder); - } // force a DStream action - } - - private static <T> void computeOutput(DStreamHolder<T> streamHolder) { - JavaDStream<WindowedValue<T>> dStream = streamHolder.getDStream(); - // cache in DStream level not RDD - // because there could be a difference in StorageLevel if the DStream is windowed. - dStream.dstream().cache(); - dStream.foreachRDD(new VoidFunction<JavaRDD<WindowedValue<T>>>() { - @Override - public void call(JavaRDD<WindowedValue<T>> rdd) throws Exception { - rdd.count(); - } - }); - } - - @Override - public void close(boolean gracefully) { - if (timeout > 0) { - jssc.awaitTerminationOrTimeout(timeout); - } else { - jssc.awaitTermination(); - } - // stop streaming context gracefully, so checkpointing (and other computations) get to - // finish before shutdown. - jssc.stop(false, gracefully); - state = State.DONE; - super.close(false); - } - - private State state = State.RUNNING; - - @Override - public State getState() { - return state; - } - - @Override - public State cancel() throws IOException { - throw new UnsupportedOperationException( - "Spark runner StreamingEvaluationContext does not support cancel."); - } - - @Override - public State waitUntilFinish() { - throw new UnsupportedOperationException( - "Spark runner StreamingEvaluationContext does not support waitUntilFinish."); - } - - @Override - public State waitUntilFinish(Duration duration) { - throw new UnsupportedOperationException( - "Spark runner StreamingEvaluationContext does not support waitUntilFinish."); - } - - //---------------- override in order to expose in package - @Override - protected <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform) { - return super.getInput(transform); - } - @Override - protected <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform) { - return super.getOutput(transform); - } - - @Override - protected JavaSparkContext getSparkContext() { - return super.getSparkContext(); - } - - @Override - protected SparkRuntimeContext getRuntimeContext() { - return super.getRuntimeContext(); - } - - @Override - public void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) { - super.setCurrentTransform(transform); - } - - @Override - protected AppliedPTransform<?, ?, ?> getCurrentTransform() { - return super.getCurrentTransform(); - } - - @Override - protected <T> void setOutputRDD(PTransform<?, ?> transform, - JavaRDDLike<WindowedValue<T>, ?> rdd) { - super.setOutputRDD(transform, rdd); - } - - @Override - protected <T> void setOutputRDDFromValues(PTransform<?, ?> transform, Iterable<T> values, - Coder<T> coder) { - super.setOutputRDDFromValues(transform, values, coder); - } - - @Override - protected boolean hasOutputRDD(PTransform<? extends PInput, ?> transform) { - return super.hasOutputRDD(transform); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1bef01fe/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index 71c27df..b30f079 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -29,6 +29,8 @@ import org.apache.beam.runners.spark.aggregators.NamedAggregators; import org.apache.beam.runners.spark.io.ConsoleIO; import org.apache.beam.runners.spark.io.CreateStream; import org.apache.beam.runners.spark.io.SparkUnboundedSource; +import org.apache.beam.runners.spark.translation.BoundedDataset; +import org.apache.beam.runners.spark.translation.Dataset; import org.apache.beam.runners.spark.translation.DoFnFunction; import org.apache.beam.runners.spark.translation.EvaluationContext; import org.apache.beam.runners.spark.translation.GroupCombineFunctions; @@ -71,15 +73,13 @@ import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.api.java.JavaDStreamLike; import org.apache.spark.streaming.api.java.JavaPairDStream; - /** * Supports translation between a Beam transform, and Spark's operations on DStreams. */ -public final class StreamingTransformTranslator { +final class StreamingTransformTranslator { private StreamingTransformTranslator() { } @@ -89,9 +89,8 @@ public final class StreamingTransformTranslator { @Override public void evaluate(ConsoleIO.Write.Unbound<T> transform, EvaluationContext context) { @SuppressWarnings("unchecked") - JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>> dstream = - (JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>>) - ((StreamingEvaluationContext) context).getStream(transform); + JavaDStream<WindowedValue<T>> dstream = + ((UnboundedDataset<T>) (context).borrowDataset(transform)).getDStream(); dstream.map(WindowingHelpers.<T>unwindowFunction()).print(transform.getNum()); } }; @@ -101,9 +100,9 @@ public final class StreamingTransformTranslator { return new TransformEvaluator<Read.Unbounded<T>>() { @Override public void evaluate(Read.Unbounded<T> transform, EvaluationContext context) { - StreamingEvaluationContext sec = (StreamingEvaluationContext) context; - sec.setStream(transform, SparkUnboundedSource.read(sec.getStreamingContext(), - sec.getRuntimeContext(), transform.getSource())); + context.putDataset(transform, + new UnboundedDataset<>(SparkUnboundedSource.read(context.getStreamingContext(), + context.getRuntimeContext(), transform.getSource()))); } }; } @@ -112,10 +111,9 @@ public final class StreamingTransformTranslator { return new TransformEvaluator<CreateStream.QueuedValues<T>>() { @Override public void evaluate(CreateStream.QueuedValues<T> transform, EvaluationContext context) { - StreamingEvaluationContext sec = (StreamingEvaluationContext) context; Iterable<Iterable<T>> values = transform.getQueuedValues(); - Coder<T> coder = sec.getOutput(transform).getCoder(); - sec.setDStreamFromQueue(transform, values, coder); + Coder<T> coder = context.getOutput(transform).getCoder(); + context.putUnboundedDatasetFromQueue(transform, values, coder); } }; } @@ -125,23 +123,23 @@ public final class StreamingTransformTranslator { @SuppressWarnings("unchecked") @Override public void evaluate(Flatten.FlattenPCollectionList<T> transform, EvaluationContext context) { - StreamingEvaluationContext sec = (StreamingEvaluationContext) context; - PCollectionList<T> pcs = sec.getInput(transform); + PCollectionList<T> pcs = context.getInput(transform); // since this is a streaming pipeline, at least one of the PCollections to "flatten" are // unbounded, meaning it represents a DStream. // So we could end up with an unbounded unified DStream. final List<JavaRDD<WindowedValue<T>>> rdds = new ArrayList<>(); final List<JavaDStream<WindowedValue<T>>> dStreams = new ArrayList<>(); - for (PCollection<T> pcol: pcs.getAll()) { - if (sec.hasStream(pcol)) { - dStreams.add((JavaDStream<WindowedValue<T>>) sec.getStream(pcol)); + for (PCollection<T> pcol : pcs.getAll()) { + Dataset dataset = context.borrowDataset(pcol); + if (dataset instanceof UnboundedDataset) { + dStreams.add(((UnboundedDataset<T>) dataset).getDStream()); } else { - rdds.add((JavaRDD<WindowedValue<T>>) context.getRDD(pcol)); + rdds.add(((BoundedDataset<T>) dataset).getRDD()); } } // start by unifying streams into a single stream. JavaDStream<WindowedValue<T>> unifiedStreams = - sec.getStreamingContext().union(dStreams.remove(0), dStreams); + context.getStreamingContext().union(dStreams.remove(0), dStreams); // now unify in RDDs. if (rdds.size() > 0) { JavaDStream<WindowedValue<T>> joined = unifiedStreams.transform( @@ -152,9 +150,9 @@ public final class StreamingTransformTranslator { return new JavaSparkContext(streamRdd.context()).union(streamRdd, rdds); } }); - sec.setStream(transform, joined); + context.putDataset(transform, new UnboundedDataset<>(joined)); } else { - sec.setStream(transform, unifiedStreams); + context.putDataset(transform, new UnboundedDataset<>(unifiedStreams)); } } }; @@ -164,12 +162,11 @@ public final class StreamingTransformTranslator { return new TransformEvaluator<Window.Bound<T>>() { @Override public void evaluate(Window.Bound<T> transform, EvaluationContext context) { - StreamingEvaluationContext sec = (StreamingEvaluationContext) context; @SuppressWarnings("unchecked") WindowFn<? super T, W> windowFn = (WindowFn<? super T, W>) transform.getWindowFn(); @SuppressWarnings("unchecked") JavaDStream<WindowedValue<T>> dStream = - (JavaDStream<WindowedValue<T>>) sec.getStream(transform); + ((UnboundedDataset<T>) context.borrowDataset(transform)).getDStream(); // get the right window durations. Duration windowDuration; Duration slideDuration; @@ -188,10 +185,10 @@ public final class StreamingTransformTranslator { dStream.window(windowDuration, slideDuration); //--- then we apply windowing to the elements if (TranslationUtils.skipAssignWindows(transform, context)) { - sec.setStream(transform, windowedDStream); + context.putDataset(transform, new UnboundedDataset<>(windowedDStream)); } else { final OldDoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn); - final SparkRuntimeContext runtimeContext = sec.getRuntimeContext(); + final SparkRuntimeContext runtimeContext = context.getRuntimeContext(); JavaDStream<WindowedValue<T>> outStream = windowedDStream.transform( new Function<JavaRDD<WindowedValue<T>>, JavaRDD<WindowedValue<T>>>() { @Override @@ -202,7 +199,7 @@ public final class StreamingTransformTranslator { new DoFnFunction<>(accum, addWindowsDoFn, runtimeContext, null, null)); } }); - sec.setStream(transform, outStream); + context.putDataset(transform, new UnboundedDataset<>(outStream)); } } }; @@ -212,18 +209,16 @@ public final class StreamingTransformTranslator { return new TransformEvaluator<GroupByKey<K, V>>() { @Override public void evaluate(GroupByKey<K, V> transform, EvaluationContext context) { - StreamingEvaluationContext sec = (StreamingEvaluationContext) context; - @SuppressWarnings("unchecked") JavaDStream<WindowedValue<KV<K, V>>> dStream = - (JavaDStream<WindowedValue<KV<K, V>>>) sec.getStream(transform); + ((UnboundedDataset<KV<K, V>>) context.borrowDataset(transform)).getDStream(); @SuppressWarnings("unchecked") - final KvCoder<K, V> coder = (KvCoder<K, V>) sec.getInput(transform).getCoder(); + final KvCoder<K, V> coder = (KvCoder<K, V>) context.getInput(transform).getCoder(); - final SparkRuntimeContext runtimeContext = sec.getRuntimeContext(); + final SparkRuntimeContext runtimeContext = context.getRuntimeContext(); final WindowingStrategy<?, ?> windowingStrategy = - sec.getInput(transform).getWindowingStrategy(); + context.getInput(transform).getWindowingStrategy(); JavaDStream<WindowedValue<KV<K, Iterable<V>>>> outStream = dStream.transform(new Function<JavaRDD<WindowedValue<KV<K, V>>>, @@ -237,7 +232,7 @@ public final class StreamingTransformTranslator { windowingStrategy); } }); - sec.setStream(transform, outStream); + context.putDataset(transform, new UnboundedDataset<>(outStream)); } }; } @@ -245,29 +240,29 @@ public final class StreamingTransformTranslator { private static <K, InputT, OutputT> TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>> combineGrouped() { return new TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>>() { + @SuppressWarnings("unchecked") @Override public void evaluate(Combine.GroupedValues<K, InputT, OutputT> transform, EvaluationContext context) { - StreamingEvaluationContext sec = (StreamingEvaluationContext) context; // get the applied combine function. PCollection<? extends KV<K, ? extends Iterable<InputT>>> input = - sec.getInput(transform); + context.getInput(transform); WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy(); - @SuppressWarnings("unchecked") final CombineWithContext.KeyedCombineFnWithContext<K, InputT, ?, OutputT> fn = (CombineWithContext.KeyedCombineFnWithContext<K, InputT, ?, OutputT>) CombineFnUtil.toFnWithContext(transform.getFn()); - @SuppressWarnings("unchecked") JavaDStream<WindowedValue<KV<K, Iterable<InputT>>>> dStream = - (JavaDStream<WindowedValue<KV<K, Iterable<InputT>>>>) sec.getStream(transform); + ((UnboundedDataset<KV<K, Iterable<InputT>>>) context.borrowDataset(transform)) + .getDStream(); - SparkKeyedCombineFn<K, InputT, ?, OutputT> combineFnWithContext = - new SparkKeyedCombineFn<>(fn, sec.getRuntimeContext(), + SparkKeyedCombineFn<K, InputT, ?, OutputT> combineFnWithContext = + new SparkKeyedCombineFn<>(fn, context.getRuntimeContext(), TranslationUtils.getSideInputs(transform.getSideInputs(), context), - windowingStrategy); - sec.setStream(transform, dStream.map(new TranslationUtils.CombineGroupedValues<>( - combineFnWithContext))); + windowingStrategy); + context.putDataset(transform, new UnboundedDataset<>(dStream.map(new TranslationUtils + .CombineGroupedValues<>( + combineFnWithContext)))); } }; } @@ -276,26 +271,24 @@ public final class StreamingTransformTranslator { combineGlobally() { return new TransformEvaluator<Combine.Globally<InputT, OutputT>>() { + @SuppressWarnings("unchecked") @Override public void evaluate(Combine.Globally<InputT, OutputT> transform, EvaluationContext context) { - StreamingEvaluationContext sec = (StreamingEvaluationContext) context; - final PCollection<InputT> input = sec.getInput(transform); + final PCollection<InputT> input = context.getInput(transform); // serializable arguments to pass. - final Coder<InputT> iCoder = sec.getInput(transform).getCoder(); - final Coder<OutputT> oCoder = sec.getOutput(transform).getCoder(); - @SuppressWarnings("unchecked") + final Coder<InputT> iCoder = context.getInput(transform).getCoder(); + final Coder<OutputT> oCoder = context.getOutput(transform).getCoder(); final CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn = (CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT>) CombineFnUtil.toFnWithContext(transform.getFn()); final WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy(); - final SparkRuntimeContext runtimeContext = sec.getRuntimeContext(); + final SparkRuntimeContext runtimeContext = context.getRuntimeContext(); final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs = TranslationUtils.getSideInputs(transform.getSideInputs(), context); final boolean hasDefault = transform.isInsertDefault(); - @SuppressWarnings("unchecked") JavaDStream<WindowedValue<InputT>> dStream = - (JavaDStream<WindowedValue<InputT>>) sec.getStream(transform); + ((UnboundedDataset<InputT>) context.borrowDataset(transform)).getDStream(); JavaDStream<WindowedValue<OutputT>> outStream = dStream.transform( new Function<JavaRDD<WindowedValue<InputT>>, JavaRDD<WindowedValue<OutputT>>>() { @@ -307,7 +300,7 @@ public final class StreamingTransformTranslator { } }); - sec.setStream(transform, outStream); + context.putDataset(transform, new UnboundedDataset<>(outStream)); } }; } @@ -315,27 +308,24 @@ public final class StreamingTransformTranslator { private static <K, InputT, AccumT, OutputT> TransformEvaluator<Combine.PerKey<K, InputT, OutputT>> combinePerKey() { return new TransformEvaluator<Combine.PerKey<K, InputT, OutputT>>() { + @SuppressWarnings("unchecked") @Override public void evaluate(final Combine.PerKey<K, InputT, OutputT> transform, final EvaluationContext context) { - StreamingEvaluationContext sec = (StreamingEvaluationContext) context; - final PCollection<KV<K, InputT>> input = sec.getInput(transform); + final PCollection<KV<K, InputT>> input = context.getInput(transform); // serializable arguments to pass. - @SuppressWarnings("unchecked") final KvCoder<K, InputT> inputCoder = - (KvCoder<K, InputT>) sec.getInput(transform).getCoder(); - @SuppressWarnings("unchecked") + (KvCoder<K, InputT>) context.getInput(transform).getCoder(); final CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn = (CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) CombineFnUtil.toFnWithContext(transform.getFn()); final WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy(); - final SparkRuntimeContext runtimeContext = sec.getRuntimeContext(); + final SparkRuntimeContext runtimeContext = context.getRuntimeContext(); final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs = TranslationUtils.getSideInputs(transform.getSideInputs(), context); - @SuppressWarnings("unchecked") JavaDStream<WindowedValue<KV<K, InputT>>> dStream = - (JavaDStream<WindowedValue<KV<K, InputT>>>) sec.getStream(transform); + ((UnboundedDataset<KV<K, InputT>>) context.borrowDataset(transform)).getDStream(); JavaDStream<WindowedValue<KV<K, OutputT>>> outStream = dStream.transform(new Function<JavaRDD<WindowedValue<KV<K, InputT>>>, @@ -347,26 +337,24 @@ public final class StreamingTransformTranslator { windowingStrategy, sideInputs); } }); - sec.setStream(transform, outStream); + context.putDataset(transform, new UnboundedDataset<>(outStream)); } }; } private static <InputT, OutputT> TransformEvaluator<ParDo.Bound<InputT, OutputT>> parDo() { return new TransformEvaluator<ParDo.Bound<InputT, OutputT>>() { + @SuppressWarnings("unchecked") @Override public void evaluate(final ParDo.Bound<InputT, OutputT> transform, final EvaluationContext context) { - final StreamingEvaluationContext sec = (StreamingEvaluationContext) context; - final SparkRuntimeContext runtimeContext = sec.getRuntimeContext(); + final SparkRuntimeContext runtimeContext = context.getRuntimeContext(); final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs = TranslationUtils.getSideInputs(transform.getSideInputs(), context); - @SuppressWarnings("unchecked") final WindowFn<Object, ?> windowFn = - (WindowFn<Object, ?>) sec.getInput(transform).getWindowingStrategy().getWindowFn(); - @SuppressWarnings("unchecked") + (WindowFn<Object, ?>) context.getInput(transform).getWindowingStrategy().getWindowFn(); JavaDStream<WindowedValue<InputT>> dStream = - (JavaDStream<WindowedValue<InputT>>) sec.getStream(transform); + ((UnboundedDataset<InputT>) context.borrowDataset(transform)).getDStream(); JavaDStream<WindowedValue<OutputT>> outStream = dStream.transform(new Function<JavaRDD<WindowedValue<InputT>>, @@ -381,7 +369,7 @@ public final class StreamingTransformTranslator { } }); - sec.setStream(transform, outStream); + context.putDataset(transform, new UnboundedDataset<>(outStream)); } }; } @@ -392,16 +380,15 @@ public final class StreamingTransformTranslator { @Override public void evaluate(final ParDo.BoundMulti<InputT, OutputT> transform, final EvaluationContext context) { - final StreamingEvaluationContext sec = (StreamingEvaluationContext) context; - final SparkRuntimeContext runtimeContext = sec.getRuntimeContext(); + final SparkRuntimeContext runtimeContext = context.getRuntimeContext(); final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs = TranslationUtils.getSideInputs(transform.getSideInputs(), context); @SuppressWarnings("unchecked") final WindowFn<Object, ?> windowFn = - (WindowFn<Object, ?>) sec.getInput(transform).getWindowingStrategy().getWindowFn(); + (WindowFn<Object, ?>) context.getInput(transform).getWindowingStrategy().getWindowFn(); @SuppressWarnings("unchecked") JavaDStream<WindowedValue<InputT>> dStream = - (JavaDStream<WindowedValue<InputT>>) sec.getStream(transform); + ((UnboundedDataset<InputT>) context.borrowDataset(transform)).getDStream(); JavaPairDStream<TupleTag<?>, WindowedValue<?>> all = dStream.transformToPair( new Function<JavaRDD<WindowedValue<InputT>>, JavaPairRDD<TupleTag<?>, WindowedValue<?>>>() { @@ -414,7 +401,7 @@ public final class StreamingTransformTranslator { runtimeContext, transform.getMainOutputTag(), sideInputs, windowFn)); } }).cache(); - PCollectionTuple pct = sec.getOutput(transform); + PCollectionTuple pct = context.getOutput(transform); for (Map.Entry<TupleTag<?>, PCollection<?>> e : pct.getAll().entrySet()) { @SuppressWarnings("unchecked") JavaPairDStream<TupleTag<?>, WindowedValue<?>> filtered = @@ -424,7 +411,7 @@ public final class StreamingTransformTranslator { JavaDStream<WindowedValue<Object>> values = (JavaDStream<WindowedValue<Object>>) (JavaDStream<?>) TranslationUtils.dStreamValues(filtered); - sec.setStream(e.getValue(), values); + context.putDataset(e.getValue(), new UnboundedDataset<>(values)); } } }; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1bef01fe/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java new file mode 100644 index 0000000..67adee2 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.translation.streaming; + +import com.google.common.collect.Iterables; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; +import javax.annotation.Nullable; +import org.apache.beam.runners.spark.coders.CoderHelpers; +import org.apache.beam.runners.spark.translation.Dataset; +import org.apache.beam.runners.spark.translation.WindowingHelpers; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.VoidFunction; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; + + +/** + * DStream holder Can also crate a DStream from a supplied queue of values, but mainly for testing. + */ +public class UnboundedDataset<T> implements Dataset { + // only set if creating a DStream from a static collection + @Nullable private transient JavaStreamingContext jssc; + + private Iterable<Iterable<T>> values; + private Coder<T> coder; + private JavaDStream<WindowedValue<T>> dStream; + + UnboundedDataset(JavaDStream<WindowedValue<T>> dStream) { + this.dStream = dStream; + } + + public UnboundedDataset(Iterable<Iterable<T>> values, JavaStreamingContext jssc, Coder<T> coder) { + this.values = values; + this.jssc = jssc; + this.coder = coder; + } + + @SuppressWarnings("ConstantConditions") + JavaDStream<WindowedValue<T>> getDStream() { + if (dStream == null) { + WindowedValue.ValueOnlyWindowedValueCoder<T> windowCoder = + WindowedValue.getValueOnlyCoder(coder); + // create the DStream from queue + Queue<JavaRDD<WindowedValue<T>>> rddQueue = new LinkedBlockingQueue<>(); + JavaRDD<WindowedValue<T>> lastRDD = null; + for (Iterable<T> v : values) { + Iterable<WindowedValue<T>> windowedValues = + Iterables.transform(v, WindowingHelpers.<T>windowValueFunction()); + JavaRDD<WindowedValue<T>> rdd = jssc.sc().parallelize( + CoderHelpers.toByteArrays(windowedValues, windowCoder)).map( + CoderHelpers.fromByteFunction(windowCoder)); + rddQueue.offer(rdd); + lastRDD = rdd; + } + // create DStream from queue, one at a time, + // with last as default in case batches repeat (graceful stops for example). + // if the stream is empty, avoid creating a default empty RDD. + // mainly for unit test so no reason to have this configurable. + dStream = lastRDD != null ? jssc.queueStream(rddQueue, true, lastRDD) + : jssc.queueStream(rddQueue, true); + } + return dStream; + } + + @Override + public void cache() { + dStream.cache(); + } + + @Override + public void action() { + dStream.foreachRDD(new VoidFunction<JavaRDD<WindowedValue<T>>>() { + @Override + public void call(JavaRDD<WindowedValue<T>> rdd) throws Exception { + rdd.count(); + } + }); + } + + @Override + public void setName(String name) { + // ignore + } +}