This is an automated email from the ASF dual-hosted git repository.

aromanenko pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to 
refs/heads/spark-runner_structured-streaming by this push:
     new 16c57c3  First attempt for ParDo primitive implementation
16c57c3 is described below

commit 16c57c30f52f0d1b76423a68b4321ee602c1e7c0
Author: Alexey Romanenko <aromanenko....@gmail.com>
AuthorDate: Mon Jan 7 10:47:04 2019 +0100

    First attempt for ParDo primitive implementation
---
 .../translation/TranslationContext.java            |  12 ++
 .../translation/batch/DoFnFunction.java            | 137 ++++++++++++++++
 .../translation/batch/ParDoTranslatorBatch.java    | 174 ++++++++++++++++++++-
 .../translation/batch/SparkProcessContext.java     | 149 ++++++++++++++++++
 .../SparkNoOpStepContext.java}                     |  24 +--
 .../batch/functions/SparkSideInputReader.java      |  62 ++++++++
 6 files changed, 545 insertions(+), 13 deletions(-)

diff --git 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index 9a3330a..33706bd 100644
--- 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -108,6 +108,13 @@ public class TranslationContext {
     return (Dataset<WindowedValue<T>>) dataset;
   }
 
+  public void putDatasetWildcard(PValue value, Dataset<WindowedValue<?>> 
dataset) {
+    if (!datasets.containsKey(value)) {
+      datasets.put(value, dataset);
+      leaves.add(dataset);
+    }
+  }
+
   public <T> void putDataset(PValue value, Dataset<WindowedValue<T>> dataset) {
     if (!datasets.containsKey(value)) {
       datasets.put(value, dataset);
@@ -131,6 +138,11 @@ public class TranslationContext {
   }
 
   @SuppressWarnings("unchecked")
+  public <T extends PValue> T getInput(PTransform<T, ?> transform) {
+    return (T) 
Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(currentTransform));
+  }
+
+  @SuppressWarnings("unchecked")
   public Map<TupleTag<?>, PValue> getInputs() {
     return currentTransform.getInputs();
   }
diff --git 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java
 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java
new file mode 100644
index 0000000..35204bc
--- /dev/null
+++ 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import 
org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.SparkNoOpStepContext;
+import 
org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.SparkSideInputReader;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.util.WindowedValue;
+
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
+import scala.Tuple2;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class DoFnFunction<InputT, OutputT>
+    implements MapPartitionsFunction<WindowedValue<InputT>, 
Tuple2<TupleTag<?>, WindowedValue<?>>> {
+
+  private final SerializablePipelineOptions serializedOptions;
+
+  private final DoFn<InputT, OutputT> doFn;
+  private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
+
+  private final WindowingStrategy<?, ?> windowingStrategy;
+
+  private final Map<TupleTag<?>, Integer> outputMap;
+  private final TupleTag<OutputT> mainOutputTag;
+  private final Coder<InputT> inputCoder;
+  private final Map<TupleTag<?>, Coder<?>> outputCoderMap;
+
+  private transient DoFnInvoker<InputT, OutputT> doFnInvoker;
+
+  public DoFnFunction(
+      DoFn<InputT, OutputT> doFn,
+      WindowingStrategy<?, ?> windowingStrategy,
+      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
+      PipelineOptions options,
+      Map<TupleTag<?>, Integer> outputMap,
+      TupleTag<OutputT> mainOutputTag,
+      Coder<InputT> inputCoder,
+      Map<TupleTag<?>, Coder<?>> outputCoderMap) {
+
+    this.doFn = doFn;
+    this.sideInputs = sideInputs;
+    this.serializedOptions = new SerializablePipelineOptions(options);
+    this.windowingStrategy = windowingStrategy;
+    this.outputMap = outputMap;
+    this.mainOutputTag = mainOutputTag;
+    this.inputCoder = inputCoder;
+    this.outputCoderMap = outputCoderMap;
+  }
+
+  @Override
+  public Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> 
call(Iterator<WindowedValue<InputT>> iter)
+      throws Exception {
+
+    DoFnOutputManager outputManager = new DoFnOutputManager();
+
+    List<TupleTag<?>> additionalOutputTags = 
Lists.newArrayList(outputMap.keySet());
+
+    DoFnRunner<InputT, OutputT> doFnRunner =
+        DoFnRunners.simpleRunner(
+            serializedOptions.get(),
+            doFn,
+            new SparkSideInputReader(sideInputs),
+            outputManager,
+            mainOutputTag,
+            additionalOutputTags,
+            new SparkNoOpStepContext(),
+            inputCoder,
+            outputCoderMap,
+            windowingStrategy);
+
+    return new SparkProcessContext<>(doFn, doFnRunner, outputManager, 
Collections.emptyIterator())
+        .processPartition(iter)
+        .iterator();
+  }
+
+  private class DoFnOutputManager
+      implements SparkProcessContext.SparkOutputManager<Tuple2<TupleTag<?>, 
WindowedValue<?>>> {
+
+    private final Multimap<TupleTag<?>, WindowedValue<?>> outputs = 
LinkedListMultimap.create();
+
+    @Override
+    public void clear() {
+      outputs.clear();
+    }
+
+    @Override
+    public Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> iterator() {
+      Iterator<Map.Entry<TupleTag<?>, WindowedValue<?>>> entryIter = 
outputs.entries().iterator();
+      return Iterators.transform(entryIter, this.entryToTupleFn());
+    }
+
+    private <K, V> Function<Map.Entry<K, V>, Tuple2<K, V>> entryToTupleFn() {
+      return en -> new Tuple2<>(en.getKey(), en.getValue());
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public synchronized <T> void output(TupleTag<T> tag, WindowedValue<T> 
output) {
+      outputs.put(tag, output);
+    }
+  }
+}
diff --git 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
index 1e57098..1ad1e3b 100644
--- 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
+++ 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
@@ -17,16 +17,184 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.beam.runners.core.construction.ParDoTranslation;
+import 
org.apache.beam.runners.spark.structuredstreaming.translation.EncoderHelpers;
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.*;
+import org.apache.spark.api.java.function.FilterFunction;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.Dataset;
+import scala.Tuple2;
 
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * TODO: Add support of state and timers TODO: Add support of side inputs
+ *
+ * @param <InputT>
+ * @param <OutputT>
+ */
 class ParDoTranslatorBatch<InputT, OutputT>
     implements TransformTranslator<PTransform<PCollection<InputT>, 
PCollectionTuple>> {
 
   @Override
   public void translateTransform(
-      PTransform<PCollection<InputT>, PCollectionTuple> transform, 
TranslationContext context) {}
+      PTransform<PCollection<InputT>, PCollectionTuple> transform, 
TranslationContext context) {
+
+    DoFn<InputT, OutputT> doFn = getDoFn(context);
+    checkState(
+        !DoFnSignatures.signatureForDoFn(doFn).processElement().isSplittable(),
+        "Not expected to directly translate splittable DoFn, should have been 
overridden: %s",
+        doFn);
+
+    Dataset<WindowedValue<InputT>> inputDataSet = 
context.getDataset(context.getInput());
+    Map<TupleTag<?>, PValue> outputs = context.getOutputs();
+    TupleTag<?> mainOutputTag = getTupleTag(context);
+
+    Map<TupleTag<?>, Integer> outputMap = Maps.newHashMap();
+
+    outputMap.put(mainOutputTag, 0);
+    int count = 1;
+    for (TupleTag<?> tag : outputs.keySet()) {
+      if (!outputMap.containsKey(tag)) {
+        outputMap.put(tag, count++);
+      }
+    }
+
+    // Union coder elements must match the order of the output tags.
+    Map<Integer, TupleTag<?>> indexMap = Maps.newTreeMap();
+    for (Map.Entry<TupleTag<?>, Integer> entry : outputMap.entrySet()) {
+      indexMap.put(entry.getValue(), entry.getKey());
+    }
+
+    // assume that the windowing strategy is the same for all outputs
+    WindowingStrategy<?, ?> windowingStrategy = null;
+
+    // collect all output Coders and create a UnionCoder for our tagged outputs
+    List<Coder<?>> outputCoders = Lists.newArrayList();
+    for (TupleTag<?> tag : indexMap.values()) {
+      PValue taggedValue = outputs.get(tag);
+      checkState(
+          taggedValue instanceof PCollection,
+          "Within ParDo, got a non-PCollection output %s of type %s",
+          taggedValue,
+          taggedValue.getClass().getSimpleName());
+      PCollection<?> coll = (PCollection<?>) taggedValue;
+      outputCoders.add(coll.getCoder());
+      windowingStrategy = coll.getWindowingStrategy();
+    }
+
+    if (windowingStrategy == null) {
+      throw new IllegalStateException("No outputs defined.");
+    }
+
+    UnionCoder unionCoder = UnionCoder.of(outputCoders);
+
+    List<PCollectionView<?>> sideInputs = getSideInputs(context);
+
+    // construct a map from side input to WindowingStrategy so that
+    // the DoFn runner can map main-input windows to side input windows
+    Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new 
HashMap<>();
+    for (PCollectionView<?> sideInput : sideInputs) {
+      sideInputStrategies.put(sideInput, 
sideInput.getWindowingStrategyInternal());
+    }
+
+    Map<TupleTag<?>, Coder<?>> outputCoderMap = context.getOutputCoders();
+
+    @SuppressWarnings("unchecked")
+    DoFnFunction<InputT, OutputT> doFnWrapper =
+        new DoFnFunction(
+            doFn,
+            windowingStrategy,
+            sideInputStrategies,
+            context.getOptions(),
+            outputMap,
+            mainOutputTag,
+            context.getInput(transform).getCoder(),
+            outputCoderMap);
+
+    Dataset<Tuple2<TupleTag<?>, WindowedValue<?>>> allOutputsDataset =
+        inputDataSet.mapPartitions(doFnWrapper, EncoderHelpers.encoder());
+
+    for (Map.Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
+      pruneOutput(context, allOutputsDataset, output);
+    }
+  }
+
+  private List<PCollectionView<?>> getSideInputs(TranslationContext context) {
+    List<PCollectionView<?>> sideInputs;
+    try {
+      sideInputs = 
ParDoTranslation.getSideInputs(context.getCurrentTransform());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return sideInputs;
+  }
+
+  private TupleTag<?> getTupleTag(TranslationContext context) {
+    TupleTag<?> mainOutputTag;
+    try {
+      mainOutputTag = 
ParDoTranslation.getMainOutputTag(context.getCurrentTransform());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return mainOutputTag;
+  }
+
+  @SuppressWarnings("unchecked")
+  private DoFn<InputT, OutputT> getDoFn(TranslationContext context) {
+    DoFn<InputT, OutputT> doFn;
+    try {
+      doFn = (DoFn<InputT, OutputT>) 
ParDoTranslation.getDoFn(context.getCurrentTransform());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return doFn;
+  }
+
+  private <T> void pruneOutput(
+      TranslationContext context,
+      Dataset<Tuple2<TupleTag<?>, WindowedValue<?>>> tmpDataset,
+      Map.Entry<TupleTag<?>, PValue> output) {
+    Dataset<Tuple2<TupleTag<?>, WindowedValue<?>>> filteredDataset =
+        tmpDataset.filter(new SparkDoFnFilterFunction(output.getKey()));
+    Dataset<WindowedValue<?>> outputDataset =
+        filteredDataset.map(
+            (MapFunction<Tuple2<TupleTag<?>, WindowedValue<?>>, 
WindowedValue<?>>)
+                value -> value._2,
+            EncoderHelpers.encoder());
+    context.putDatasetWildcard(output.getValue(), outputDataset);
+  }
+
+  class SparkDoFnFilterFunction implements FilterFunction<Tuple2<TupleTag<?>, 
WindowedValue<?>>> {
+
+    private final TupleTag<?> key;
+
+    public SparkDoFnFilterFunction(TupleTag<?> key) {
+      this.key = key;
+    }
+
+    @Override
+    public boolean call(Tuple2<TupleTag<?>, WindowedValue<?>> value) {
+      return value._1.equals(key);
+    }
+  }
 }
diff --git 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SparkProcessContext.java
 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SparkProcessContext.java
new file mode 100644
index 0000000..720b7ab
--- /dev/null
+++ 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SparkProcessContext.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import com.google.common.collect.AbstractIterator;
+import org.apache.beam.runners.core.*;
+import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/** Spark runner process context processes Spark partitions using Beam's 
{@link DoFnRunner}. */
+class SparkProcessContext<FnInputT, FnOutputT, OutputT> {
+
+  private final DoFn<FnInputT, FnOutputT> doFn;
+  private final DoFnRunner<FnInputT, FnOutputT> doFnRunner;
+  private final SparkOutputManager<OutputT> outputManager;
+  private Iterator<TimerInternals.TimerData> timerDataIterator;
+
+  SparkProcessContext(
+      DoFn<FnInputT, FnOutputT> doFn,
+      DoFnRunner<FnInputT, FnOutputT> doFnRunner,
+      SparkOutputManager<OutputT> outputManager,
+      Iterator<TimerInternals.TimerData> timerDataIterator) {
+
+    this.doFn = doFn;
+    this.doFnRunner = doFnRunner;
+    this.outputManager = outputManager;
+    this.timerDataIterator = timerDataIterator;
+  }
+
+  Iterable<OutputT> processPartition(Iterator<WindowedValue<FnInputT>> 
partition) throws Exception {
+
+    // skip if partition is empty.
+    if (!partition.hasNext()) {
+      return new ArrayList<>();
+    }
+
+    // process the partition; finishBundle() is called from within the output 
iterator.
+    return this.getOutputIterable(partition, doFnRunner);
+  }
+
+  private void clearOutput() {
+    outputManager.clear();
+  }
+
+  private Iterator<OutputT> getOutputIterator() {
+    return outputManager.iterator();
+  }
+
+  private Iterable<OutputT> getOutputIterable(
+      final Iterator<WindowedValue<FnInputT>> iter,
+      final DoFnRunner<FnInputT, FnOutputT> doFnRunner) {
+    return () -> new ProcCtxtIterator(iter, doFnRunner);
+  }
+
+  interface SparkOutputManager<T> extends OutputManager, Iterable<T> {
+    void clear();
+  }
+
+  private class ProcCtxtIterator extends AbstractIterator<OutputT> {
+
+    private final Iterator<WindowedValue<FnInputT>> inputIterator;
+    private final DoFnRunner<FnInputT, FnOutputT> doFnRunner;
+    private Iterator<OutputT> outputIterator;
+    private boolean isBundleStarted;
+    private boolean isBundleFinished;
+
+    ProcCtxtIterator(
+        Iterator<WindowedValue<FnInputT>> iterator, DoFnRunner<FnInputT, 
FnOutputT> doFnRunner) {
+      this.inputIterator = iterator;
+      this.doFnRunner = doFnRunner;
+      this.outputIterator = getOutputIterator();
+    }
+
+    @Override
+    protected OutputT computeNext() {
+      // Process each element from the (input) iterator, which produces, zero, 
one or more
+      // output elements (of type V) in the output iterator. Note that the 
output
+      // collection (and iterator) is reset between each call to 
processElement, so the
+      // collection only holds the output values for each call to 
processElement, rather
+      // than for the whole partition (which would use too much memory).
+      if (!isBundleStarted) {
+        isBundleStarted = true;
+        // call startBundle() before beginning to process the partition.
+        doFnRunner.startBundle();
+      }
+
+      try {
+        while (true) {
+          if (outputIterator.hasNext()) {
+            return outputIterator.next();
+          }
+
+          clearOutput();
+          if (inputIterator.hasNext()) {
+            // grab the next element and process it.
+            doFnRunner.processElement(inputIterator.next());
+            outputIterator = getOutputIterator();
+          } else if (timerDataIterator.hasNext()) {
+            fireTimer(timerDataIterator.next());
+            outputIterator = getOutputIterator();
+          } else {
+            // no more input to consume, but finishBundle can produce more 
output
+            if (!isBundleFinished) {
+              isBundleFinished = true;
+              doFnRunner.finishBundle();
+              outputIterator = getOutputIterator();
+              continue; // try to consume outputIterator from start of loop
+            }
+            DoFnInvokers.invokerFor(doFn).invokeTeardown();
+            return endOfData();
+          }
+        }
+      } catch (final RuntimeException re) {
+        DoFnInvokers.invokerFor(doFn).invokeTeardown();
+        throw re;
+      }
+    }
+
+    private void fireTimer(TimerInternals.TimerData timer) {
+      StateNamespace namespace = timer.getNamespace();
+      checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
+      BoundedWindow window = ((StateNamespaces.WindowNamespace) 
namespace).getWindow();
+      doFnRunner.onTimer(timer.getTimerId(), window, timer.getTimestamp(), 
timer.getDomain());
+    }
+  }
+}
diff --git 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SparkNoOpStepContext.java
similarity index 59%
copy from 
runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
copy to 
runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SparkNoOpStepContext.java
index 1e57098..889cdf5 100644
--- 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
+++ 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SparkNoOpStepContext.java
@@ -15,18 +15,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+package 
org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions;
 
-import 
org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
-import 
org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StepContext;
+import org.apache.beam.runners.core.TimerInternals;
 
-class ParDoTranslatorBatch<InputT, OutputT>
-    implements TransformTranslator<PTransform<PCollection<InputT>, 
PCollectionTuple>> {
+/** A {@link StepContext} for Spark Batch Runner execution. */
+public class SparkNoOpStepContext implements StepContext {
 
   @Override
-  public void translateTransform(
-      PTransform<PCollection<InputT>, PCollectionTuple> transform, 
TranslationContext context) {}
+  public StateInternals stateInternals() {
+    throw new UnsupportedOperationException("stateInternals is not supported");
+  }
+
+  @Override
+  public TimerInternals timerInternals() {
+    throw new UnsupportedOperationException("timerInternals is not supported");
+  }
 }
diff --git 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SparkSideInputReader.java
 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SparkSideInputReader.java
new file mode 100644
index 0000000..da75101
--- /dev/null
+++ 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SparkSideInputReader.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions;
+
+import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * TODO: Need to be implemented
+ *
+ * A {@link SideInputReader} for the Spark Batch Runner.
+ */
+public class SparkSideInputReader implements SideInputReader {
+  private final Map<TupleTag<?>, WindowingStrategy<?, ?>> sideInputs;
+
+
+  public SparkSideInputReader(
+      Map<PCollectionView<?>, WindowingStrategy<?, ?>> indexByView) {
+    sideInputs = new HashMap<>();
+  }
+
+  @Nullable
+  @Override
+  public <T> T get(PCollectionView<T> view, BoundedWindow window) {
+    return null;
+  }
+
+  @Override
+  public <T> boolean contains(PCollectionView<T> view) {
+    return sideInputs.containsKey(view.getTagInternal());
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return sideInputs.isEmpty();
+  }
+}

Reply via email to