This is an automated email from the ASF dual-hosted git repository.

aromanenko pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to 
refs/heads/spark-runner_structured-streaming by this push:
     new e65815e  Don't use deprecated sideInput.getWindowingStrategyInternal()
e65815e is described below

commit e65815e97b5056a839b0d9e2cce5543a7231d158
Author: Alexey Romanenko <aromanenko....@gmail.com>
AuthorDate: Fri Feb 1 18:50:00 2019 +0100

    Don't use deprecated sideInput.getWindowingStrategyInternal()
---
 .../translation/batch/ParDoTranslatorBatch.java    | 22 +++++++++++++---------
 1 file changed, 13 insertions(+), 9 deletions(-)

diff --git 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
index a984615..fbb6649 100644
--- 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
+++ 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
@@ -61,6 +61,7 @@ class ParDoTranslatorBatch<InputT, OutputT>
   public void translateTransform(
       PTransform<PCollection<InputT>, PCollectionTuple> transform, 
TranslationContext context) {
 
+    // Check for not-supported advanced features
     // TODO: add support of Splittable DoFn
     DoFn<InputT, OutputT> doFn = getDoFn(context);
     checkState(
@@ -74,10 +75,16 @@ class ParDoTranslatorBatch<InputT, OutputT>
         signature.stateDeclarations().size() > 0 || 
signature.timerDeclarations().size() > 0;
     checkState(!stateful, "States and timers are not supported for the 
moment.");
 
+    // TODO: add support of SideInputs
+    List<PCollectionView<?>> sideInputs = getSideInputs(context);
+    final boolean hasSideInputs = sideInputs != null && sideInputs.size() > 0;
+    checkState(!hasSideInputs, "SideInputs are not supported for the moment.");
+
+
+    // Init main variables
     Dataset<WindowedValue<InputT>> inputDataSet = 
context.getDataset(context.getInput());
     Map<TupleTag<?>, PValue> outputs = context.getOutputs();
     TupleTag<?> mainOutputTag = getTupleTag(context);
-
     Map<TupleTag<?>, Integer> outputTags = Maps.newHashMap();
 
     outputTags.put(mainOutputTag, 0);
@@ -98,7 +105,7 @@ class ParDoTranslatorBatch<InputT, OutputT>
     WindowingStrategy<?, ?> windowingStrategy = null;
 
     // collect all output Coders and create a UnionCoder for our tagged outputs
-    List<Coder<?>> outputCoders = Lists.newArrayList();
+//    List<Coder<?>> outputCoders = Lists.newArrayList();
     for (TupleTag<?> tag : indexMap.values()) {
       PValue taggedValue = outputs.get(tag);
       checkState(
@@ -107,7 +114,7 @@ class ParDoTranslatorBatch<InputT, OutputT>
           taggedValue,
           taggedValue.getClass().getSimpleName());
       PCollection<?> coll = (PCollection<?>) taggedValue;
-      outputCoders.add(coll.getCoder());
+//      outputCoders.add(coll.getCoder());
       windowingStrategy = coll.getWindowingStrategy();
     }
 
@@ -115,18 +122,15 @@ class ParDoTranslatorBatch<InputT, OutputT>
       throw new IllegalStateException("No outputs defined.");
     }
 
-    UnionCoder unionCoder = UnionCoder.of(outputCoders);
+//    UnionCoder unionCoder = UnionCoder.of(outputCoders);
+
 
-    List<PCollectionView<?>> sideInputs = getSideInputs(context);
-    final boolean hasSideInputs = sideInputs != null && sideInputs.size() > 0;
-    // TODO: add support of SideInputs
-    checkState(!hasSideInputs, "SideInputs are not supported for the moment.");
 
     // construct a map from side input to WindowingStrategy so that
     // the DoFn runner can map main-input windows to side input windows
     Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new 
HashMap<>();
     for (PCollectionView<?> sideInput : sideInputs) {
-      sideInputStrategies.put(sideInput, 
sideInput.getWindowingStrategyInternal());
+      sideInputStrategies.put(sideInput, 
sideInput.getPCollection().getWindowingStrategy());
     }
 
     Map<TupleTag<?>, Coder<?>> outputCoderMap = context.getOutputCoders();

Reply via email to