This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push: new e65815e Don't use deprecated sideInput.getWindowingStrategyInternal() e65815e is described below commit e65815e97b5056a839b0d9e2cce5543a7231d158 Author: Alexey Romanenko <aromanenko....@gmail.com> AuthorDate: Fri Feb 1 18:50:00 2019 +0100 Don't use deprecated sideInput.getWindowingStrategyInternal() --- .../translation/batch/ParDoTranslatorBatch.java | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java index a984615..fbb6649 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java @@ -61,6 +61,7 @@ class ParDoTranslatorBatch<InputT, OutputT> public void translateTransform( PTransform<PCollection<InputT>, PCollectionTuple> transform, TranslationContext context) { + // Check for not-supported advanced features // TODO: add support of Splittable DoFn DoFn<InputT, OutputT> doFn = getDoFn(context); checkState( @@ -74,10 +75,16 @@ class ParDoTranslatorBatch<InputT, OutputT> signature.stateDeclarations().size() > 0 || signature.timerDeclarations().size() > 0; checkState(!stateful, "States and timers are not supported for the moment."); + // TODO: add support of SideInputs + List<PCollectionView<?>> sideInputs = getSideInputs(context); + final boolean hasSideInputs = sideInputs != null && sideInputs.size() > 0; + checkState(!hasSideInputs, "SideInputs are not supported for the moment."); + + + // Init main variables Dataset<WindowedValue<InputT>> inputDataSet = context.getDataset(context.getInput()); Map<TupleTag<?>, PValue> outputs = context.getOutputs(); TupleTag<?> mainOutputTag = getTupleTag(context); - Map<TupleTag<?>, Integer> outputTags = Maps.newHashMap(); outputTags.put(mainOutputTag, 0); @@ -98,7 +105,7 @@ class ParDoTranslatorBatch<InputT, OutputT> WindowingStrategy<?, ?> windowingStrategy = null; // collect all output Coders and create a UnionCoder for our tagged outputs - List<Coder<?>> outputCoders = Lists.newArrayList(); +// List<Coder<?>> outputCoders = Lists.newArrayList(); for (TupleTag<?> tag : indexMap.values()) { PValue taggedValue = outputs.get(tag); checkState( @@ -107,7 +114,7 @@ class ParDoTranslatorBatch<InputT, OutputT> taggedValue, taggedValue.getClass().getSimpleName()); PCollection<?> coll = (PCollection<?>) taggedValue; - outputCoders.add(coll.getCoder()); +// outputCoders.add(coll.getCoder()); windowingStrategy = coll.getWindowingStrategy(); } @@ -115,18 +122,15 @@ class ParDoTranslatorBatch<InputT, OutputT> throw new IllegalStateException("No outputs defined."); } - UnionCoder unionCoder = UnionCoder.of(outputCoders); +// UnionCoder unionCoder = UnionCoder.of(outputCoders); + - List<PCollectionView<?>> sideInputs = getSideInputs(context); - final boolean hasSideInputs = sideInputs != null && sideInputs.size() > 0; - // TODO: add support of SideInputs - checkState(!hasSideInputs, "SideInputs are not supported for the moment."); // construct a map from side input to WindowingStrategy so that // the DoFn runner can map main-input windows to side input windows Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>(); for (PCollectionView<?> sideInput : sideInputs) { - sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal()); + sideInputStrategies.put(sideInput, sideInput.getPCollection().getWindowingStrategy()); } Map<TupleTag<?>, Coder<?>> outputCoderMap = context.getOutputCoders();