This is an automated email from the ASF dual-hosted git repository. goenka pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 4b34e41 [BEAM-7011] Clean-up Flink portable runner to not reference removed URN. new 407be69 Merge pull request #8295 from lukecwik/side_input 4b34e41 is described below commit 4b34e41bb71a738cfd662f0a25b6a73d0eb83dfe Author: Luke Cwik <lc...@google.com> AuthorDate: Fri Apr 12 13:44:43 2019 -0700 [BEAM-7011] Clean-up Flink portable runner to not reference removed URN. --- .../functions/FlinkStreamingSideInputHandlerFactory.java | 6 +----- .../fnexecution/translation/BatchSideInputHandlerFactory.java | 6 +----- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStreamingSideInputHandlerFactory.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStreamingSideInputHandlerFactory.java index 69a2947..03e5537 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStreamingSideInputHandlerFactory.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStreamingSideInputHandlerFactory.java @@ -35,7 +35,6 @@ import org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputH import org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputHandlerFactory; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.transforms.Materializations; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; @@ -104,10 +103,7 @@ public class FlinkStreamingSideInputHandlerFactory implements SideInputHandlerFa @SuppressWarnings("unchecked") // T == V Coder<V> outputCoder = (Coder<V>) elementCoder; return forIterableSideInput(collectionNode, outputCoder); - } else if (PTransformTranslation.MULTIMAP_SIDE_INPUT.equals(accessPattern.getUrn()) - || Materializations.MULTIMAP_MATERIALIZATION_URN.equals(accessPattern.getUrn())) { - // TODO: Remove non standard URN. - // Using non standard version of multimap urn as dataflow uses the non standard urn. + } else if (PTransformTranslation.MULTIMAP_SIDE_INPUT.equals(accessPattern.getUrn())) { @SuppressWarnings("unchecked") // T == KV<?, V> KvCoder<?, V> kvCoder = (KvCoder<?, V>) elementCoder; return forMultimapSideInput(collectionNode, kvCoder.getKeyCoder(), kvCoder.getValueCoder()); diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/BatchSideInputHandlerFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/BatchSideInputHandlerFactory.java index 5460898..8ad6bbf 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/BatchSideInputHandlerFactory.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/BatchSideInputHandlerFactory.java @@ -36,7 +36,6 @@ import org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputH import org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputHandlerFactory; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.transforms.Materializations; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; @@ -98,10 +97,7 @@ public class BatchSideInputHandlerFactory implements SideInputHandlerFactory { Coder<V> outputCoder = (Coder<V>) elementCoder; return forIterableSideInput( sideInputGetter.getSideInput(collectionNode.getId()), outputCoder, windowCoder); - } else if (PTransformTranslation.MULTIMAP_SIDE_INPUT.equals(accessPattern.getUrn()) - || Materializations.MULTIMAP_MATERIALIZATION_URN.equals(accessPattern.getUrn())) { - // TODO: Remove non standard URN. - // Using non standard version of multimap urn as dataflow uses the non standard urn. + } else if (PTransformTranslation.MULTIMAP_SIDE_INPUT.equals(accessPattern.getUrn())) { @SuppressWarnings("unchecked") // T == KV<?, V> KvCoder<?, V> kvCoder = (KvCoder<?, V>) elementCoder; return forMultimapSideInput(