[BEAM-799] Support GroupByKey directly. Remove runner override for GroupByKey.
Avoid NPE if no sideInputs are available in reader. Handle CombineFn with or without context. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a54ded37 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a54ded37 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a54ded37 Branch: refs/heads/python-sdk Commit: a54ded373fa7f6508fb46eea1a1d6f9bc405114b Parents: f2fe1ae Author: Sela <ans...@paypal.com> Authored: Sat Oct 22 14:51:50 2016 +0300 Committer: Sela <ans...@paypal.com> Committed: Wed Oct 26 10:00:45 2016 +0300 ---------------------------------------------------------------------- .../apache/beam/runners/spark/SparkRunner.java | 19 ------ .../translation/GroupCombineFunctions.java | 66 +++++++++----------- .../spark/translation/TransformTranslator.java | 43 +++---------- .../streaming/StreamingTransformTranslator.java | 65 +++++-------------- .../spark/util/SparkSideInputReader.java | 2 +- 5 files changed, 55 insertions(+), 140 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a54ded37/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index b17c38c..45c7f55 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.spark; import java.util.Collection; import java.util.List; -import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly; import org.apache.beam.runners.spark.translation.EvaluationContext; import org.apache.beam.runners.spark.translation.SparkContextFactory; import org.apache.beam.runners.spark.translation.SparkPipelineTranslator; @@ -36,7 +35,6 @@ import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.runners.TransformTreeNode; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.values.PBegin; @@ -115,23 +113,6 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> { } /** - * Overrides for this runner. - */ - @SuppressWarnings("rawtypes") - @Override - public <OutputT extends POutput, InputT extends PInput> OutputT apply( - PTransform<InputT, OutputT> transform, InputT input) { - - if (transform instanceof GroupByKey) { - return (OutputT) ((PCollection) input).apply( - new GroupByKeyViaGroupByKeyOnly((GroupByKey) transform)); - } else { - return super.apply(transform, input); - } - } - - - /** * No parameter constructor defaults to running this pipeline in Spark's local mode, in a single * thread. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a54ded37/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java index de02b26..e2a0f87 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java @@ -20,11 +20,9 @@ package org.apache.beam.runners.spark.translation; import com.google.common.collect.Lists; - import java.util.Collections; import java.util.Map; import org.apache.beam.runners.core.GroupAlsoByWindowsViaOutputBufferDoFn; -import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.spark.aggregators.NamedAggregators; import org.apache.beam.runners.spark.coders.CoderHelpers; @@ -38,6 +36,7 @@ import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.ReifyTimestampAndWindowsDoFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; @@ -60,54 +59,45 @@ import scala.Tuple2; public class GroupCombineFunctions { /** - * Apply {@link GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly} to a Spark RDD. + * Apply {@link org.apache.beam.sdk.transforms.GroupByKey} to a Spark RDD. */ - public static <K, V> JavaRDD<WindowedValue<KV<K, Iterable<V>>>> groupByKeyOnly( - JavaRDD<WindowedValue<KV<K, V>>> rdd, KvCoder<K, V> coder) { + public static <K, V, W extends BoundedWindow> JavaRDD<WindowedValue<KV<K, + Iterable<V>>>> groupByKey(JavaRDD<WindowedValue<KV<K, V>>> rdd, + Accumulator<NamedAggregators> accum, + KvCoder<K, V> coder, + SparkRuntimeContext runtimeContext, + WindowingStrategy<?, W> windowingStrategy) { + //--- coders. final Coder<K> keyCoder = coder.getKeyCoder(); final Coder<V> valueCoder = coder.getValueCoder(); + final WindowedValue.WindowedValueCoder<V> wvCoder = WindowedValue.FullWindowedValueCoder.of( + valueCoder, windowingStrategy.getWindowFn().windowCoder()); + + //--- groupByKey. // Use coders to convert objects in the PCollection to byte arrays, so they // can be transferred over the network for the shuffle. - return rdd.map(WindowingHelpers.<KV<K, V>>unwindowFunction()) - .mapToPair(TranslationUtils.<K, V>toPairFunction()) - .mapToPair(CoderHelpers.toByteFunction(keyCoder, valueCoder)) - .groupByKey() - .mapToPair(CoderHelpers.fromByteFunctionIterable(keyCoder, valueCoder)) - // empty windows are OK here, see GroupByKey#evaluateHelper in the SDK - .map(TranslationUtils.<K, Iterable<V>>fromPairFunction()) - .map(WindowingHelpers.<KV<K, Iterable<V>>>windowFunction()); - } - - /** - * Apply {@link GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow} to a Spark RDD. - */ - public static <K, V, W extends BoundedWindow> JavaRDD<WindowedValue<KV<K, Iterable<V>>>> - groupAlsoByWindow(JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> rdd, - GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow<K, V> transform, - SparkRuntimeContext runtimeContext, - Accumulator<NamedAggregators> accum, - KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder) { - //--- coders. - Coder<Iterable<WindowedValue<V>>> inputValueCoder = inputKvCoder.getValueCoder(); - IterableCoder<WindowedValue<V>> inputIterableValueCoder = - (IterableCoder<WindowedValue<V>>) inputValueCoder; - Coder<WindowedValue<V>> inputIterableElementCoder = inputIterableValueCoder.getElemCoder(); - WindowedValue.WindowedValueCoder<V> inputIterableWindowedValueCoder = - (WindowedValue.WindowedValueCoder<V>) inputIterableElementCoder; - Coder<V> inputIterableElementValueCoder = inputIterableWindowedValueCoder.getValueCoder(); + JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> groupedByKey = + rdd.mapPartitions(new DoFnFunction<KV<K, V>, KV<K, WindowedValue<V>>>(null, + new ReifyTimestampAndWindowsDoFn<K, V>(), runtimeContext, null, null)) + .map(WindowingHelpers.<KV<K, WindowedValue<V>>>unwindowFunction()) + .mapToPair(TranslationUtils.<K, WindowedValue<V>>toPairFunction()) + .mapToPair(CoderHelpers.toByteFunction(keyCoder, wvCoder)) + .groupByKey() + .mapToPair(CoderHelpers.fromByteFunctionIterable(keyCoder, wvCoder)) + // empty windows are OK here, see GroupByKey#evaluateHelper in the SDK + .map(TranslationUtils.<K, Iterable<WindowedValue<V>>>fromPairFunction()) + .map(WindowingHelpers.<KV<K, Iterable<WindowedValue<V>>>>windowFunction()); - @SuppressWarnings("unchecked") - WindowingStrategy<?, W> windowingStrategy = - (WindowingStrategy<?, W>) transform.getWindowingStrategy(); + //--- now group also by window. @SuppressWarnings("unchecked") WindowFn<Object, W> windowFn = (WindowFn<Object, W>) windowingStrategy.getWindowFn(); - // GroupAlsoByWindow current uses a dummy in-memory StateInternals OldDoFn<KV<K, Iterable<WindowedValue<V>>>, KV<K, Iterable<V>>> gabwDoFn = new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>( windowingStrategy, new TranslationUtils.InMemoryStateInternalsFactory<K>(), - SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder)); - return rdd.mapPartitions(new DoFnFunction<>(accum, gabwDoFn, runtimeContext, null, windowFn)); + SystemReduceFn.<K, V, W>buffering(valueCoder)); + return groupedByKey.mapPartitions(new DoFnFunction<>(accum, gabwDoFn, runtimeContext, null, + windowFn)); } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a54ded37/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index b55e3b2..2e682c4 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -32,8 +32,6 @@ import org.apache.avro.mapred.AvroKey; import org.apache.avro.mapreduce.AvroJob; import org.apache.avro.mapreduce.AvroKeyInputFormat; import org.apache.beam.runners.core.AssignWindowsDoFn; -import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow; -import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly; import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton; import org.apache.beam.runners.spark.aggregators.NamedAggregators; import org.apache.beam.runners.spark.io.SourceRDD; @@ -51,6 +49,7 @@ import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -58,7 +57,6 @@ import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.AppliedCombineFn; import org.apache.beam.sdk.util.CombineFnUtil; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; @@ -112,10 +110,10 @@ public final class TransformTranslator { }; } - private static <K, V> TransformEvaluator<GroupByKeyOnly<K, V>> gbko() { - return new TransformEvaluator<GroupByKeyOnly<K, V>>() { + private static <K, V> TransformEvaluator<GroupByKey<K, V>> groupByKey() { + return new TransformEvaluator<GroupByKey<K, V>>() { @Override - public void evaluate(GroupByKeyOnly<K, V> transform, EvaluationContext context) { + public void evaluate(GroupByKey<K, V> transform, EvaluationContext context) { @SuppressWarnings("unchecked") JavaRDD<WindowedValue<KV<K, V>>> inRDD = (JavaRDD<WindowedValue<KV<K, V>>>) context.getInputRDD(transform); @@ -123,30 +121,11 @@ public final class TransformTranslator { @SuppressWarnings("unchecked") final KvCoder<K, V> coder = (KvCoder<K, V>) context.getInput(transform).getCoder(); - context.setOutputRDD(transform, GroupCombineFunctions.groupByKeyOnly(inRDD, coder)); - } - }; - } - - private static <K, V, W extends BoundedWindow> - TransformEvaluator<GroupAlsoByWindow<K, V>> gabw() { - return new TransformEvaluator<GroupAlsoByWindow<K, V>>() { - @Override - public void evaluate(GroupAlsoByWindow<K, V> transform, EvaluationContext context) { - @SuppressWarnings("unchecked") - JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> inRDD = - (JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>>) - context.getInputRDD(transform); - - @SuppressWarnings("unchecked") - final KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder = - (KvCoder<K, Iterable<WindowedValue<V>>>) context.getInput(transform).getCoder(); - final Accumulator<NamedAggregators> accum = - AccumulatorSingleton.getInstance(context.getSparkContext()); + AccumulatorSingleton.getInstance(context.getSparkContext()); - context.setOutputRDD(transform, GroupCombineFunctions.groupAlsoByWindow(inRDD, transform, - context.getRuntimeContext(), accum, inputKvCoder)); + context.setOutputRDD(transform, GroupCombineFunctions.groupByKey(inRDD, accum, coder, + context.getRuntimeContext(), context.getInput(transform).getWindowingStrategy())); } }; } @@ -161,13 +140,10 @@ public final class TransformTranslator { PCollection<? extends KV<K, ? extends Iterable<InputT>>> input = context.getInput(transform); WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy(); - AppliedCombineFn<? super K, ? super InputT, ?, OutputT> appliedFn = transform - .getAppliedFn(context.getPipeline().getCoderRegistry(), input.getCoder(), - windowingStrategy); @SuppressWarnings("unchecked") CombineWithContext.KeyedCombineFnWithContext<K, InputT, ?, OutputT> fn = (CombineWithContext.KeyedCombineFnWithContext<K, InputT, ?, OutputT>) - CombineFnUtil.toFnWithContext(appliedFn.getFn()); + CombineFnUtil.toFnWithContext(transform.getFn()); @SuppressWarnings("unchecked") JavaRDDLike<WindowedValue<KV<K, Iterable<InputT>>>, ?> inRDD = @@ -592,8 +568,7 @@ public final class TransformTranslator { EVALUATORS.put(HadoopIO.Write.Bound.class, writeHadoop()); EVALUATORS.put(ParDo.Bound.class, parDo()); EVALUATORS.put(ParDo.BoundMulti.class, multiDo()); - EVALUATORS.put(GroupByKeyOnly.class, gbko()); - EVALUATORS.put(GroupAlsoByWindow.class, gabw()); + EVALUATORS.put(GroupByKey.class, groupByKey()); EVALUATORS.put(Combine.GroupedValues.class, combineGrouped()); EVALUATORS.put(Combine.Globally.class, combineGlobally()); EVALUATORS.put(Combine.PerKey.class, combinePerKey()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a54ded37/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index 9f2d764..1af5e07 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -26,9 +26,6 @@ import java.util.Map; import java.util.Set; import kafka.serializer.Decoder; import org.apache.beam.runners.core.AssignWindowsDoFn; -import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly; -import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow; -import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly; import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton; import org.apache.beam.runners.spark.aggregators.NamedAggregators; import org.apache.beam.runners.spark.io.ConsoleIO; @@ -50,6 +47,7 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -58,7 +56,6 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.SlidingWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.AppliedCombineFn; import org.apache.beam.sdk.util.CombineFnUtil; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; @@ -84,6 +81,7 @@ import org.apache.spark.streaming.kafka.KafkaUtils; import scala.Tuple2; + /** * Supports translation between a Beam transform, and Spark's operations on DStreams. */ @@ -231,10 +229,10 @@ public final class StreamingTransformTranslator { }; } - private static <K, V> TransformEvaluator<GroupByKeyOnly<K, V>> gbko() { - return new TransformEvaluator<GroupByKeyOnly<K, V>>() { + private static <K, V> TransformEvaluator<GroupByKey<K, V>> groupByKey() { + return new TransformEvaluator<GroupByKey<K, V>>() { @Override - public void evaluate(GroupByKeyOnly<K, V> transform, EvaluationContext context) { + public void evaluate(GroupByKey<K, V> transform, EvaluationContext context) { StreamingEvaluationContext sec = (StreamingEvaluationContext) context; @SuppressWarnings("unchecked") @@ -244,13 +242,20 @@ public final class StreamingTransformTranslator { @SuppressWarnings("unchecked") final KvCoder<K, V> coder = (KvCoder<K, V>) sec.getInput(transform).getCoder(); + final SparkRuntimeContext runtimeContext = sec.getRuntimeContext(); + final WindowingStrategy<?, ?> windowingStrategy = + sec.getInput(transform).getWindowingStrategy(); + JavaDStream<WindowedValue<KV<K, Iterable<V>>>> outStream = dStream.transform(new Function<JavaRDD<WindowedValue<KV<K, V>>>, JavaRDD<WindowedValue<KV<K, Iterable<V>>>>>() { @Override public JavaRDD<WindowedValue<KV<K, Iterable<V>>>> call( JavaRDD<WindowedValue<KV<K, V>>> rdd) throws Exception { - return GroupCombineFunctions.groupByKeyOnly(rdd, coder); + final Accumulator<NamedAggregators> accum = + AccumulatorSingleton.getInstance(new JavaSparkContext(rdd.context())); + return GroupCombineFunctions.groupByKey(rdd, accum, coder, runtimeContext, + windowingStrategy); } }); sec.setStream(transform, outStream); @@ -258,39 +263,6 @@ public final class StreamingTransformTranslator { }; } - private static <K, V, W extends BoundedWindow> - TransformEvaluator<GroupAlsoByWindow<K, V>> gabw() { - return new TransformEvaluator<GroupAlsoByWindow<K, V>>() { - @Override - public void evaluate(final GroupAlsoByWindow<K, V> transform, EvaluationContext context) { - final StreamingEvaluationContext sec = (StreamingEvaluationContext) context; - final SparkRuntimeContext runtimeContext = sec.getRuntimeContext(); - @SuppressWarnings("unchecked") - JavaDStream<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> dStream = - (JavaDStream<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>>) - sec.getStream(transform); - - @SuppressWarnings("unchecked") - final KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder = - (KvCoder<K, Iterable<WindowedValue<V>>>) sec.getInput(transform).getCoder(); - - JavaDStream<WindowedValue<KV<K, Iterable<V>>>> outStream = - dStream.transform(new Function<JavaRDD<WindowedValue<KV<K, - Iterable<WindowedValue<V>>>>>, JavaRDD<WindowedValue<KV<K, Iterable<V>>>>>() { - @Override - public JavaRDD<WindowedValue<KV<K, Iterable<V>>>> call(JavaRDD<WindowedValue<KV<K, - Iterable<WindowedValue<V>>>>> rdd) throws Exception { - final Accumulator<NamedAggregators> accum = - AccumulatorSingleton.getInstance(new JavaSparkContext(rdd.context())); - return GroupCombineFunctions.groupAlsoByWindow(rdd, transform, runtimeContext, - accum, inputKvCoder); - } - }); - sec.setStream(transform, outStream); - } - }; - } - private static <K, InputT, OutputT> TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>> combineGrouped() { return new TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>>() { @@ -302,12 +274,10 @@ public final class StreamingTransformTranslator { PCollection<? extends KV<K, ? extends Iterable<InputT>>> input = sec.getInput(transform); WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy(); - AppliedCombineFn<? super K, ? super InputT, ?, OutputT> appliedFn = transform - .getAppliedFn(context.getPipeline().getCoderRegistry(), input.getCoder(), - windowingStrategy); @SuppressWarnings("unchecked") - CombineWithContext.KeyedCombineFnWithContext<K, InputT, ?, OutputT> fn = - (CombineWithContext.KeyedCombineFnWithContext<K, InputT, ?, OutputT>) appliedFn.getFn(); + final CombineWithContext.KeyedCombineFnWithContext<K, InputT, ?, OutputT> fn = + (CombineWithContext.KeyedCombineFnWithContext<K, InputT, ?, OutputT>) + CombineFnUtil.toFnWithContext(transform.getFn()); @SuppressWarnings("unchecked") JavaDStream<WindowedValue<KV<K, Iterable<InputT>>>> dStream = @@ -485,8 +455,7 @@ public final class StreamingTransformTranslator { .newHashMap(); static { - EVALUATORS.put(GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly.class, gbko()); - EVALUATORS.put(GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow.class, gabw()); + EVALUATORS.put(GroupByKey.class, groupByKey()); EVALUATORS.put(Combine.GroupedValues.class, combineGrouped()); EVALUATORS.put(Combine.Globally.class, combineGlobally()); EVALUATORS.put(Combine.PerKey.class, combinePerKey()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a54ded37/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java index 96c286a..0a804ae 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java @@ -90,6 +90,6 @@ public class SparkSideInputReader implements SideInputReader { @Override public boolean isEmpty() { - return sideInputs.isEmpty(); + return sideInputs != null && sideInputs.isEmpty(); } }