This is an automated email from the ASF dual-hosted git repository.

goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b34e41  [BEAM-7011] Clean-up Flink portable runner to not reference 
removed URN.
     new 407be69  Merge pull request #8295 from lukecwik/side_input
4b34e41 is described below

commit 4b34e41bb71a738cfd662f0a25b6a73d0eb83dfe
Author: Luke Cwik <lc...@google.com>
AuthorDate: Fri Apr 12 13:44:43 2019 -0700

    [BEAM-7011] Clean-up Flink portable runner to not reference removed URN.
---
 .../functions/FlinkStreamingSideInputHandlerFactory.java            | 6 +-----
 .../fnexecution/translation/BatchSideInputHandlerFactory.java       | 6 +-----
 2 files changed, 2 insertions(+), 10 deletions(-)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStreamingSideInputHandlerFactory.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStreamingSideInputHandlerFactory.java
index 69a2947..03e5537 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStreamingSideInputHandlerFactory.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStreamingSideInputHandlerFactory.java
@@ -35,7 +35,6 @@ import 
org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputH
 import 
org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputHandlerFactory;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.Materializations;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -104,10 +103,7 @@ public class FlinkStreamingSideInputHandlerFactory 
implements SideInputHandlerFa
       @SuppressWarnings("unchecked") // T == V
       Coder<V> outputCoder = (Coder<V>) elementCoder;
       return forIterableSideInput(collectionNode, outputCoder);
-    } else if 
(PTransformTranslation.MULTIMAP_SIDE_INPUT.equals(accessPattern.getUrn())
-        || 
Materializations.MULTIMAP_MATERIALIZATION_URN.equals(accessPattern.getUrn())) {
-      // TODO: Remove non standard URN.
-      // Using non standard version of multimap urn as dataflow uses the non 
standard urn.
+    } else if 
(PTransformTranslation.MULTIMAP_SIDE_INPUT.equals(accessPattern.getUrn())) {
       @SuppressWarnings("unchecked") // T == KV<?, V>
       KvCoder<?, V> kvCoder = (KvCoder<?, V>) elementCoder;
       return forMultimapSideInput(collectionNode, kvCoder.getKeyCoder(), 
kvCoder.getValueCoder());
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/BatchSideInputHandlerFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/BatchSideInputHandlerFactory.java
index 5460898..8ad6bbf 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/BatchSideInputHandlerFactory.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/BatchSideInputHandlerFactory.java
@@ -36,7 +36,6 @@ import 
org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputH
 import 
org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputHandlerFactory;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.Materializations;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
@@ -98,10 +97,7 @@ public class BatchSideInputHandlerFactory implements 
SideInputHandlerFactory {
       Coder<V> outputCoder = (Coder<V>) elementCoder;
       return forIterableSideInput(
           sideInputGetter.getSideInput(collectionNode.getId()), outputCoder, 
windowCoder);
-    } else if 
(PTransformTranslation.MULTIMAP_SIDE_INPUT.equals(accessPattern.getUrn())
-        || 
Materializations.MULTIMAP_MATERIALIZATION_URN.equals(accessPattern.getUrn())) {
-      // TODO: Remove non standard URN.
-      // Using non standard version of multimap urn as dataflow uses the non 
standard urn.
+    } else if 
(PTransformTranslation.MULTIMAP_SIDE_INPUT.equals(accessPattern.getUrn())) {
       @SuppressWarnings("unchecked") // T == KV<?, V>
       KvCoder<?, V> kvCoder = (KvCoder<?, V>) elementCoder;
       return forMultimapSideInput(

Reply via email to