[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming

2018-05-29 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=106770=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106770
 ]

ASF GitHub Bot logged work on BEAM-4297:


Author: ASF GitHub Bot
Created on: 29/May/18 19:02
Start Date: 29/May/18 19:02
Worklog Time Spent: 10m 
  Work Description: tweise closed pull request #5407: [BEAM-4297] Streaming 
executable stage translation and operator for portable Flink runner.
URL: https://github.com/apache/beam/pull/5407
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
index 72fe10e22c0..b7ee08378dc 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
@@ -20,7 +20,6 @@
 import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.common.collect.BiMap;
-import com.google.common.collect.ImmutableBiMap;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
@@ -56,6 +55,7 @@
 import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.runners.flink.translation.types.KvKeySelector;
+import 
org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils;
 import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 import org.apache.beam.runners.fnexecution.wire.WireCoders;
@@ -319,7 +319,8 @@ public void translate(BatchTranslationContext context, 
RunnerApi.Pipeline pipeli
 RunnerApi.Components components = pipeline.getComponents();
 Map outputs = transform.getTransform().getOutputsMap();
 // Mapping from PCollection id to coder tag id.
-BiMap outputMap = createOutputMap(outputs.values());
+BiMap outputMap =
+FlinkPipelineTranslatorUtils.createOutputMap(outputs.values());
 // Collect all output Coders and create a UnionCoder for our tagged 
outputs.
 List> unionCoders = Lists.newArrayList();
 // Enforce tuple tag sorting by union tag index.
@@ -615,14 +616,4 @@ private static void pruneOutput(
 context.addDataSet(collectionId, pruningOperator);
   }
 
-  /**  Creates a mapping from PCollection id to output tag integer. */
-  private static BiMap createOutputMap(Iterable 
localOutputs) {
-ImmutableBiMap.Builder builder = ImmutableBiMap.builder();
-int outputIndex = 0;
-for (String tag : localOutputs) {
-  builder.put(tag, outputIndex);
-  outputIndex++;
-}
-return builder.build();
-  }
 }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index b7bc8ca1a40..ff991dce7ed 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -17,15 +17,20 @@
  */
 package org.apache.beam.runners.flink;
 
+import com.google.common.collect.BiMap;
 import com.google.common.collect.HashMultiset;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.protobuf.InvalidProtocolBufferException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.TreeMap;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
@@ -36,8 +41,11 @@
 import org.apache.beam.runners.core.construction.graph.PipelineNode;
 import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
 import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import 

[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming

2018-05-29 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=106704=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106704
 ]

ASF GitHub Bot logged work on BEAM-4297:


Author: ASF GitHub Bot
Created on: 29/May/18 16:41
Start Date: 29/May/18 16:41
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5407: 
[BEAM-4297] Streaming executable stage translation and operator for portable 
Flink runner.
URL: https://github.com/apache/beam/pull/5407#discussion_r191494018
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 ##
 @@ -423,8 +432,133 @@ private void translateImpulse(
   String id,
   RunnerApi.Pipeline pipeline,
   StreamingTranslationContext context) {
+// TODO: Fail on stateful DoFns for now.
+// TODO: Support stateful DoFns by inserting group-by-keys where necessary.
+// TODO: Fail on splittable DoFns.
+// TODO: Special-case single outputs to avoid multiplexing PCollections.
+RunnerApi.Components components = pipeline.getComponents();
+RunnerApi.PTransform transform = components.getTransformsOrThrow(id);
+Map outputs = transform.getOutputsMap();
+RehydratedComponents rehydratedComponents =
+RehydratedComponents.forComponents(components);
+
+BiMap outputMap =
+FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet());
+Map>> outputCoders = Maps.newHashMap();
+for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) 
{
+  String collectionId = outputs.get(localOutputName);
+  Coder> windowCoder = (Coder) 
instantiateCoder(collectionId, components);
+  outputCoders.put(localOutputName, windowCoder);
+}
+
+final RunnerApi.ExecutableStagePayload stagePayload;
+try {
+  stagePayload = 
RunnerApi.ExecutableStagePayload.parseFrom(transform.getSpec().getPayload());
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+
+String inputPCollectionId =
+Iterables.getOnlyElement(transform.getInputsMap().values());
 
 Review comment:
   SGTM


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 106704)
Time Spent: 3h 50m  (was: 3h 40m)

> Flink portable runner executable stage operator for streaming
> -
>
> Key: BEAM-4297
> URL: https://issues.apache.org/jira/browse/BEAM-4297
> Project: Beam
>  Issue Type: Task
>  Components: runner-flink
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming

2018-05-29 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=106470=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106470
 ]

ASF GitHub Bot logged work on BEAM-4297:


Author: ASF GitHub Bot
Created on: 29/May/18 06:04
Start Date: 29/May/18 06:04
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #5407: 
[BEAM-4297] Streaming executable stage translation and operator for portable 
Flink runner.
URL: https://github.com/apache/beam/pull/5407#discussion_r191311373
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 ##
 @@ -423,8 +432,133 @@ private void translateImpulse(
   String id,
   RunnerApi.Pipeline pipeline,
   StreamingTranslationContext context) {
+// TODO: Fail on stateful DoFns for now.
+// TODO: Support stateful DoFns by inserting group-by-keys where necessary.
+// TODO: Fail on splittable DoFns.
+// TODO: Special-case single outputs to avoid multiplexing PCollections.
+RunnerApi.Components components = pipeline.getComponents();
+RunnerApi.PTransform transform = components.getTransformsOrThrow(id);
+Map outputs = transform.getOutputsMap();
+RehydratedComponents rehydratedComponents =
+RehydratedComponents.forComponents(components);
+
+BiMap outputMap =
+FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet());
+Map>> outputCoders = Maps.newHashMap();
+for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) 
{
+  String collectionId = outputs.get(localOutputName);
+  Coder> windowCoder = (Coder) 
instantiateCoder(collectionId, components);
+  outputCoders.put(localOutputName, windowCoder);
+}
+
+final RunnerApi.ExecutableStagePayload stagePayload;
+try {
+  stagePayload = 
RunnerApi.ExecutableStagePayload.parseFrom(transform.getSpec().getPayload());
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+
+String inputPCollectionId =
+Iterables.getOnlyElement(transform.getInputsMap().values());
 
-throw new RuntimeException("executable stage translation not implemented");
+// TODO: is this still relevant?
+// we assume that the transformation does not change the windowing 
strategy.
+RunnerApi.WindowingStrategy windowingStrategyProto =
 
 Review comment:
   Removed, it was a remnant of translation code from old runner (only used in 
case of stateful ParDo). Since key handling and output encoding will happen in 
the SDK harness, we probably won't need to know the windowing strategy in the 
operator.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 106470)
Time Spent: 3h 40m  (was: 3.5h)

> Flink portable runner executable stage operator for streaming
> -
>
> Key: BEAM-4297
> URL: https://issues.apache.org/jira/browse/BEAM-4297
> Project: Beam
>  Issue Type: Task
>  Components: runner-flink
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming

2018-05-28 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=106467=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106467
 ]

ASF GitHub Bot logged work on BEAM-4297:


Author: ASF GitHub Bot
Created on: 29/May/18 05:36
Start Date: 29/May/18 05:36
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #5407: 
[BEAM-4297] Streaming executable stage translation and operator for portable 
Flink runner.
URL: https://github.com/apache/beam/pull/5407#discussion_r191307968
 
 

 ##
 File path: 
runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
 ##
 @@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.streaming;
+
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.protobuf.Struct;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
+import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
+import org.apache.beam.runners.flink.ArtifactSourcePool;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator;
+import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
+import org.apache.beam.runners.fnexecution.control.RemoteBundle;
+import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.OutputTag;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link ExecutableStageDoFnOperator}. */
+@RunWith(JUnit4.class)
+public class ExecutableStageDoFnOperatorTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Mock private RuntimeContext runtimeContext;
+  @Mock private DistributedCache distributedCache;
+  @Mock private FlinkExecutableStageContext stageContext;
+  @Mock private StageBundleFactory stageBundleFactory;
 
 Review comment:
   I think the mock based test is good for covering just the operator class, 
without other dependencies.  InProcessServerFactory might be a good way to 
write an integration test that also covers the translator, outside of the 
validate runner suite. I can probably do that as follow-up.


This is an automated 

[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming

2018-05-28 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=106463=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106463
 ]

ASF GitHub Bot logged work on BEAM-4297:


Author: ASF GitHub Bot
Created on: 29/May/18 05:14
Start Date: 29/May/18 05:14
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #5407: 
[BEAM-4297] Streaming executable stage translation and operator for portable 
Flink runner.
URL: https://github.com/apache/beam/pull/5407#discussion_r191305096
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/FlinkPipelineTranslatorUtils.java
 ##
 @@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.utils;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+
+/**
+ * Utilities for pipeline translation.
+ */
+public final class FlinkPipelineTranslatorUtils {
+  private FlinkPipelineTranslatorUtils() {}
+
+  /**  Creates a mapping from PCollection id to output tag integer. */
+  public static BiMap createOutputMap(Iterable 
localOutputs) {
+ImmutableBiMap.Builder builder = ImmutableBiMap.builder();
+int outputIndex = 0;
+for (String tag : localOutputs) {
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 106463)
Time Spent: 3h 10m  (was: 3h)

> Flink portable runner executable stage operator for streaming
> -
>
> Key: BEAM-4297
> URL: https://issues.apache.org/jira/browse/BEAM-4297
> Project: Beam
>  Issue Type: Task
>  Components: runner-flink
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming

2018-05-28 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=106464=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106464
 ]

ASF GitHub Bot logged work on BEAM-4297:


Author: ASF GitHub Bot
Created on: 29/May/18 05:14
Start Date: 29/May/18 05:14
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #5407: 
[BEAM-4297] Streaming executable stage translation and operator for portable 
Flink runner.
URL: https://github.com/apache/beam/pull/5407#discussion_r191305612
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 ##
 @@ -423,8 +432,133 @@ private void translateImpulse(
   String id,
   RunnerApi.Pipeline pipeline,
   StreamingTranslationContext context) {
+// TODO: Fail on stateful DoFns for now.
+// TODO: Support stateful DoFns by inserting group-by-keys where necessary.
+// TODO: Fail on splittable DoFns.
+// TODO: Special-case single outputs to avoid multiplexing PCollections.
+RunnerApi.Components components = pipeline.getComponents();
+RunnerApi.PTransform transform = components.getTransformsOrThrow(id);
+Map outputs = transform.getOutputsMap();
+RehydratedComponents rehydratedComponents =
+RehydratedComponents.forComponents(components);
+
+BiMap outputMap =
+FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet());
+Map>> outputCoders = Maps.newHashMap();
+for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) 
{
+  String collectionId = outputs.get(localOutputName);
+  Coder> windowCoder = (Coder) 
instantiateCoder(collectionId, components);
+  outputCoders.put(localOutputName, windowCoder);
+}
+
+final RunnerApi.ExecutableStagePayload stagePayload;
+try {
+  stagePayload = 
RunnerApi.ExecutableStagePayload.parseFrom(transform.getSpec().getPayload());
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+
+String inputPCollectionId =
+Iterables.getOnlyElement(transform.getInputsMap().values());
 
 Review comment:
   I added a test for serialization. If we agree on the repeated instantiation 
of ExecutableStage, then I can take this up in a separate PR (for both, batch 
and streaming translation). I would do that once we have test and end-to-end 
coverage, right now the translators are still not wired.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 106464)
Time Spent: 3h 20m  (was: 3h 10m)

> Flink portable runner executable stage operator for streaming
> -
>
> Key: BEAM-4297
> URL: https://issues.apache.org/jira/browse/BEAM-4297
> Project: Beam
>  Issue Type: Task
>  Components: runner-flink
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming

2018-05-28 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=106462=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106462
 ]

ASF GitHub Bot logged work on BEAM-4297:


Author: ASF GitHub Bot
Created on: 29/May/18 05:14
Start Date: 29/May/18 05:14
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #5407: 
[BEAM-4297] Streaming executable stage translation and operator for portable 
Flink runner.
URL: https://github.com/apache/beam/pull/5407#discussion_r191305254
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 ##
 @@ -423,8 +432,133 @@ private void translateImpulse(
   String id,
   RunnerApi.Pipeline pipeline,
   StreamingTranslationContext context) {
+// TODO: Fail on stateful DoFns for now.
+// TODO: Support stateful DoFns by inserting group-by-keys where necessary.
+// TODO: Fail on splittable DoFns.
+// TODO: Special-case single outputs to avoid multiplexing PCollections.
+RunnerApi.Components components = pipeline.getComponents();
+RunnerApi.PTransform transform = components.getTransformsOrThrow(id);
+Map outputs = transform.getOutputsMap();
+RehydratedComponents rehydratedComponents =
+RehydratedComponents.forComponents(components);
+
+BiMap outputMap =
+FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet());
+Map>> outputCoders = Maps.newHashMap();
+for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) 
{
 
 Review comment:
   I have cleaned up this portion of the code.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 106462)
Time Spent: 3h 10m  (was: 3h)

> Flink portable runner executable stage operator for streaming
> -
>
> Key: BEAM-4297
> URL: https://issues.apache.org/jira/browse/BEAM-4297
> Project: Beam
>  Issue Type: Task
>  Components: runner-flink
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming

2018-05-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=105301=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105301
 ]

ASF GitHub Bot logged work on BEAM-4297:


Author: ASF GitHub Bot
Created on: 23/May/18 20:00
Start Date: 23/May/18 20:00
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5407: 
[BEAM-4297] Streaming executable stage translation and operator for portable 
Flink runner.
URL: https://github.com/apache/beam/pull/5407#discussion_r190380409
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 ##
 @@ -423,8 +432,133 @@ private void translateImpulse(
   String id,
   RunnerApi.Pipeline pipeline,
   StreamingTranslationContext context) {
+// TODO: Fail on stateful DoFns for now.
+// TODO: Support stateful DoFns by inserting group-by-keys where necessary.
+// TODO: Fail on splittable DoFns.
+// TODO: Special-case single outputs to avoid multiplexing PCollections.
+RunnerApi.Components components = pipeline.getComponents();
+RunnerApi.PTransform transform = components.getTransformsOrThrow(id);
+Map outputs = transform.getOutputsMap();
+RehydratedComponents rehydratedComponents =
+RehydratedComponents.forComponents(components);
+
+BiMap outputMap =
+FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet());
+Map> outputCoders = Maps.newHashMap();
+for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) 
{
+  String collectionId = outputs.get(localOutputName);
+  Coder windowCoder = (Coder) 
instantiateCoder(collectionId, components);
+  outputCoders.put(localOutputName, windowCoder);
+}
+
+final RunnerApi.ExecutableStagePayload stagePayload;
+try {
+  stagePayload = 
RunnerApi.ExecutableStagePayload.parseFrom(transform.getSpec().getPayload());
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+
+String inputPCollectionId =
+Iterables.getOnlyElement(transform.getInputsMap().values());
 
 Review comment:
   I'm just arguing for pushing most of the manipulation done within 
ExecutableProcessBundleDescriptor into the ExecutableStage payload (minus the 
ApiServiceDescriptor binding) so it doesn't need modification. This would allow 
the ExecutableStage to concretely answer what are the input coders, output 
coders, side input coders, state coders, ... in addition to any other 
information.
   
   Longer term it seems if we had a way for the runner to say whether we need a 
keyed input context or grouped keyed output context makes sense as the runner 
could then say. These are the cases I know of:
   * KV for SplittableDoFn input, StatefulDoFn input, GBK input, 
Multimap side input materialization input, window mapping input
   * KV for GBK output
   Do you know of any others?
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 105301)
Time Spent: 2h 50m  (was: 2h 40m)

> Flink portable runner executable stage operator for streaming
> -
>
> Key: BEAM-4297
> URL: https://issues.apache.org/jira/browse/BEAM-4297
> Project: Beam
>  Issue Type: Task
>  Components: runner-flink
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming

2018-05-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=105302=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105302
 ]

ASF GitHub Bot logged work on BEAM-4297:


Author: ASF GitHub Bot
Created on: 23/May/18 20:00
Start Date: 23/May/18 20:00
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5407: 
[BEAM-4297] Streaming executable stage translation and operator for portable 
Flink runner.
URL: https://github.com/apache/beam/pull/5407#discussion_r190380409
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 ##
 @@ -423,8 +432,133 @@ private void translateImpulse(
   String id,
   RunnerApi.Pipeline pipeline,
   StreamingTranslationContext context) {
+// TODO: Fail on stateful DoFns for now.
+// TODO: Support stateful DoFns by inserting group-by-keys where necessary.
+// TODO: Fail on splittable DoFns.
+// TODO: Special-case single outputs to avoid multiplexing PCollections.
+RunnerApi.Components components = pipeline.getComponents();
+RunnerApi.PTransform transform = components.getTransformsOrThrow(id);
+Map outputs = transform.getOutputsMap();
+RehydratedComponents rehydratedComponents =
+RehydratedComponents.forComponents(components);
+
+BiMap outputMap =
+FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet());
+Map> outputCoders = Maps.newHashMap();
+for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) 
{
+  String collectionId = outputs.get(localOutputName);
+  Coder windowCoder = (Coder) 
instantiateCoder(collectionId, components);
+  outputCoders.put(localOutputName, windowCoder);
+}
+
+final RunnerApi.ExecutableStagePayload stagePayload;
+try {
+  stagePayload = 
RunnerApi.ExecutableStagePayload.parseFrom(transform.getSpec().getPayload());
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+
+String inputPCollectionId =
+Iterables.getOnlyElement(transform.getInputsMap().values());
 
 Review comment:
   I'm just arguing for pushing most of the manipulation done within 
ExecutableProcessBundleDescriptor into the ExecutableStage payload (minus the 
ApiServiceDescriptor binding) so it doesn't need modification. This would allow 
the ExecutableStage to concretely answer what are the input coders, output 
coders, side input coders, state coders, ... in addition to any other 
information.
   
   Longer term it seems if we had a way for the runner to say whether we need a 
keyed input context or grouped keyed output context makes sense as the runner 
could then say. These are the cases I know of:
   * KV for SplittableDoFn input, StatefulDoFn input, GBK input, 
Multimap side input materialization input, window mapping input
   * KV for GBK output
   
   Do you know of any others?
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 105302)
Time Spent: 3h  (was: 2h 50m)

> Flink portable runner executable stage operator for streaming
> -
>
> Key: BEAM-4297
> URL: https://issues.apache.org/jira/browse/BEAM-4297
> Project: Beam
>  Issue Type: Task
>  Components: runner-flink
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 3h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming

2018-05-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=105256=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105256
 ]

ASF GitHub Bot logged work on BEAM-4297:


Author: ASF GitHub Bot
Created on: 23/May/18 18:43
Start Date: 23/May/18 18:43
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5407: 
[BEAM-4297] Streaming executable stage translation and operator for portable 
Flink runner.
URL: https://github.com/apache/beam/pull/5407#discussion_r190358677
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 ##
 @@ -423,8 +432,133 @@ private void translateImpulse(
   String id,
   RunnerApi.Pipeline pipeline,
   StreamingTranslationContext context) {
+// TODO: Fail on stateful DoFns for now.
+// TODO: Support stateful DoFns by inserting group-by-keys where necessary.
+// TODO: Fail on splittable DoFns.
+// TODO: Special-case single outputs to avoid multiplexing PCollections.
+RunnerApi.Components components = pipeline.getComponents();
+RunnerApi.PTransform transform = components.getTransformsOrThrow(id);
+Map outputs = transform.getOutputsMap();
+RehydratedComponents rehydratedComponents =
+RehydratedComponents.forComponents(components);
+
+BiMap outputMap =
+FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet());
+Map> outputCoders = Maps.newHashMap();
+for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) 
{
+  String collectionId = outputs.get(localOutputName);
+  Coder windowCoder = (Coder) 
instantiateCoder(collectionId, components);
+  outputCoders.put(localOutputName, windowCoder);
+}
+
+final RunnerApi.ExecutableStagePayload stagePayload;
+try {
+  stagePayload = 
RunnerApi.ExecutableStagePayload.parseFrom(transform.getSpec().getPayload());
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+
+String inputPCollectionId =
+Iterables.getOnlyElement(transform.getInputsMap().values());
 
 Review comment:
   Unfortunately, Flink needs to have an associated serializer 
(TypeInformation, aka Coder) with each distributed collection. This 
TypeInformation needs to be known at pipeline construction time. It need not 
match the exact coder being used to materialize elements over gRPC, but it does 
need to match the in-memory element type.
   
   We could get around this partially by representing everything as bytes. The 
downside is that each runner-native operation that requires structure (e.g., 
GBK) will require an additional operation to break elements into their 
constituent parts. This step itself also requires knowledge of the coded type, 
so we ultimately run into the same issue.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 105256)
Time Spent: 2h 40m  (was: 2.5h)

> Flink portable runner executable stage operator for streaming
> -
>
> Key: BEAM-4297
> URL: https://issues.apache.org/jira/browse/BEAM-4297
> Project: Beam
>  Issue Type: Task
>  Components: runner-flink
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming

2018-05-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=105255=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105255
 ]

ASF GitHub Bot logged work on BEAM-4297:


Author: ASF GitHub Bot
Created on: 23/May/18 18:39
Start Date: 23/May/18 18:39
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5407: 
[BEAM-4297] Streaming executable stage translation and operator for portable 
Flink runner.
URL: https://github.com/apache/beam/pull/5407#discussion_r190357323
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 ##
 @@ -423,8 +432,133 @@ private void translateImpulse(
   String id,
   RunnerApi.Pipeline pipeline,
   StreamingTranslationContext context) {
+// TODO: Fail on stateful DoFns for now.
+// TODO: Support stateful DoFns by inserting group-by-keys where necessary.
+// TODO: Fail on splittable DoFns.
+// TODO: Special-case single outputs to avoid multiplexing PCollections.
+RunnerApi.Components components = pipeline.getComponents();
+RunnerApi.PTransform transform = components.getTransformsOrThrow(id);
+Map outputs = transform.getOutputsMap();
+RehydratedComponents rehydratedComponents =
+RehydratedComponents.forComponents(components);
+
+BiMap outputMap =
+FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet());
+Map> outputCoders = Maps.newHashMap();
+for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) 
{
+  String collectionId = outputs.get(localOutputName);
+  Coder windowCoder = (Coder) 
instantiateCoder(collectionId, components);
 
 Review comment:
   As commented above, we should be able to recreate the ExecutableStage 
multiple times and only bind the service (ApiServiceDescriptor) information 
should happen upstream.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 105255)
Time Spent: 2.5h  (was: 2h 20m)

> Flink portable runner executable stage operator for streaming
> -
>
> Key: BEAM-4297
> URL: https://issues.apache.org/jira/browse/BEAM-4297
> Project: Beam
>  Issue Type: Task
>  Components: runner-flink
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming

2018-05-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=105253=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105253
 ]

ASF GitHub Bot logged work on BEAM-4297:


Author: ASF GitHub Bot
Created on: 23/May/18 18:35
Start Date: 23/May/18 18:35
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5407: 
[BEAM-4297] Streaming executable stage translation and operator for portable 
Flink runner.
URL: https://github.com/apache/beam/pull/5407#discussion_r190355574
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 ##
 @@ -423,8 +432,133 @@ private void translateImpulse(
   String id,
   RunnerApi.Pipeline pipeline,
   StreamingTranslationContext context) {
+// TODO: Fail on stateful DoFns for now.
+// TODO: Support stateful DoFns by inserting group-by-keys where necessary.
+// TODO: Fail on splittable DoFns.
+// TODO: Special-case single outputs to avoid multiplexing PCollections.
+RunnerApi.Components components = pipeline.getComponents();
+RunnerApi.PTransform transform = components.getTransformsOrThrow(id);
+Map outputs = transform.getOutputsMap();
+RehydratedComponents rehydratedComponents =
+RehydratedComponents.forComponents(components);
+
+BiMap outputMap =
+FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet());
+Map> outputCoders = Maps.newHashMap();
+for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) 
{
+  String collectionId = outputs.get(localOutputName);
+  Coder windowCoder = (Coder) 
instantiateCoder(collectionId, components);
+  outputCoders.put(localOutputName, windowCoder);
+}
+
+final RunnerApi.ExecutableStagePayload stagePayload;
+try {
+  stagePayload = 
RunnerApi.ExecutableStagePayload.parseFrom(transform.getSpec().getPayload());
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+
+String inputPCollectionId =
+Iterables.getOnlyElement(transform.getInputsMap().values());
 
 Review comment:
   Unfortunately the way in which we construct coders and other properties of 
the ExecutableProcessBundleDescriptor are done during execution and it would be 
best if we somehow could make all these details be stable during pipeline 
translation so during execution it doesn't change. Having Flink rely on calling 
WireCoders.instantiateRunnerWireCoder(...) is an anti-pattern for encapsulation.
   
   So if we need to construct the executable stage payload twice, we could make 
the contract have it be stable regardless the number of times it is 
constructed. I just want to push more of the input/output/coder/state/side 
input information upto translation time instead of having it deep within 
execution. In my opinion, only service (ApiServiceDescriptor) binding should 
happen there.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 105253)
Time Spent: 2h 20m  (was: 2h 10m)

> Flink portable runner executable stage operator for streaming
> -
>
> Key: BEAM-4297
> URL: https://issues.apache.org/jira/browse/BEAM-4297
> Project: Beam
>  Issue Type: Task
>  Components: runner-flink
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming

2018-05-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=105250=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105250
 ]

ASF GitHub Bot logged work on BEAM-4297:


Author: ASF GitHub Bot
Created on: 23/May/18 18:33
Start Date: 23/May/18 18:33
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5407: 
[BEAM-4297] Streaming executable stage translation and operator for portable 
Flink runner.
URL: https://github.com/apache/beam/pull/5407#discussion_r190355574
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 ##
 @@ -423,8 +432,133 @@ private void translateImpulse(
   String id,
   RunnerApi.Pipeline pipeline,
   StreamingTranslationContext context) {
+// TODO: Fail on stateful DoFns for now.
+// TODO: Support stateful DoFns by inserting group-by-keys where necessary.
+// TODO: Fail on splittable DoFns.
+// TODO: Special-case single outputs to avoid multiplexing PCollections.
+RunnerApi.Components components = pipeline.getComponents();
+RunnerApi.PTransform transform = components.getTransformsOrThrow(id);
+Map outputs = transform.getOutputsMap();
+RehydratedComponents rehydratedComponents =
+RehydratedComponents.forComponents(components);
+
+BiMap outputMap =
+FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet());
+Map> outputCoders = Maps.newHashMap();
+for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) 
{
+  String collectionId = outputs.get(localOutputName);
+  Coder windowCoder = (Coder) 
instantiateCoder(collectionId, components);
+  outputCoders.put(localOutputName, windowCoder);
+}
+
+final RunnerApi.ExecutableStagePayload stagePayload;
+try {
+  stagePayload = 
RunnerApi.ExecutableStagePayload.parseFrom(transform.getSpec().getPayload());
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+
+String inputPCollectionId =
+Iterables.getOnlyElement(transform.getInputsMap().values());
 
 Review comment:
   Unfortunately the way in which we construct coders and other properties of 
the ExecutableProcessBundleDescriptor are done during execution and it would be 
best if we somehow could make all these details be stable during pipeline 
translation so during execution it doesn't change. Having Flink rely on calling 
WireCoders.instantiateRunnerWireCoder(...) is an anti-pattern for encapsulation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 105250)
Time Spent: 2h 10m  (was: 2h)

> Flink portable runner executable stage operator for streaming
> -
>
> Key: BEAM-4297
> URL: https://issues.apache.org/jira/browse/BEAM-4297
> Project: Beam
>  Issue Type: Task
>  Components: runner-flink
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming

2018-05-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=105238=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105238
 ]

ASF GitHub Bot logged work on BEAM-4297:


Author: ASF GitHub Bot
Created on: 23/May/18 18:29
Start Date: 23/May/18 18:29
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5407: 
[BEAM-4297] Streaming executable stage translation and operator for portable 
Flink runner.
URL: https://github.com/apache/beam/pull/5407#discussion_r190353926
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 ##
 @@ -423,8 +432,133 @@ private void translateImpulse(
   String id,
   RunnerApi.Pipeline pipeline,
   StreamingTranslationContext context) {
+// TODO: Fail on stateful DoFns for now.
+// TODO: Support stateful DoFns by inserting group-by-keys where necessary.
+// TODO: Fail on splittable DoFns.
+// TODO: Special-case single outputs to avoid multiplexing PCollections.
+RunnerApi.Components components = pipeline.getComponents();
+RunnerApi.PTransform transform = components.getTransformsOrThrow(id);
+Map outputs = transform.getOutputsMap();
+RehydratedComponents rehydratedComponents =
+RehydratedComponents.forComponents(components);
+
+BiMap outputMap =
+FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet());
+Map> outputCoders = Maps.newHashMap();
+for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) 
{
 
 Review comment:
   I see, that makes sense.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 105238)
Time Spent: 2h  (was: 1h 50m)

> Flink portable runner executable stage operator for streaming
> -
>
> Key: BEAM-4297
> URL: https://issues.apache.org/jira/browse/BEAM-4297
> Project: Beam
>  Issue Type: Task
>  Components: runner-flink
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 2h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming

2018-05-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=105232=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105232
 ]

ASF GitHub Bot logged work on BEAM-4297:


Author: ASF GitHub Bot
Created on: 23/May/18 18:25
Start Date: 23/May/18 18:25
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5407: 
[BEAM-4297] Streaming executable stage translation and operator for portable 
Flink runner.
URL: https://github.com/apache/beam/pull/5407#discussion_r190349666
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 ##
 @@ -423,8 +432,133 @@ private void translateImpulse(
   String id,
   RunnerApi.Pipeline pipeline,
   StreamingTranslationContext context) {
+// TODO: Fail on stateful DoFns for now.
+// TODO: Support stateful DoFns by inserting group-by-keys where necessary.
+// TODO: Fail on splittable DoFns.
+// TODO: Special-case single outputs to avoid multiplexing PCollections.
+RunnerApi.Components components = pipeline.getComponents();
+RunnerApi.PTransform transform = components.getTransformsOrThrow(id);
+Map outputs = transform.getOutputsMap();
+RehydratedComponents rehydratedComponents =
+RehydratedComponents.forComponents(components);
+
+BiMap outputMap =
+FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet());
+Map> outputCoders = Maps.newHashMap();
+for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) 
{
 
 Review comment:
   A tree set of what? The intent here is to order the _coders_ (the values in 
the map) by their tags (the keys).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 105232)
Time Spent: 1.5h  (was: 1h 20m)

> Flink portable runner executable stage operator for streaming
> -
>
> Key: BEAM-4297
> URL: https://issues.apache.org/jira/browse/BEAM-4297
> Project: Beam
>  Issue Type: Task
>  Components: runner-flink
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming

2018-05-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=105233=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105233
 ]

ASF GitHub Bot logged work on BEAM-4297:


Author: ASF GitHub Bot
Created on: 23/May/18 18:25
Start Date: 23/May/18 18:25
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5407: 
[BEAM-4297] Streaming executable stage translation and operator for portable 
Flink runner.
URL: https://github.com/apache/beam/pull/5407#discussion_r190352395
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 ##
 @@ -423,8 +432,133 @@ private void translateImpulse(
   String id,
   RunnerApi.Pipeline pipeline,
   StreamingTranslationContext context) {
+// TODO: Fail on stateful DoFns for now.
+// TODO: Support stateful DoFns by inserting group-by-keys where necessary.
+// TODO: Fail on splittable DoFns.
+// TODO: Special-case single outputs to avoid multiplexing PCollections.
+RunnerApi.Components components = pipeline.getComponents();
+RunnerApi.PTransform transform = components.getTransformsOrThrow(id);
+Map outputs = transform.getOutputsMap();
+RehydratedComponents rehydratedComponents =
+RehydratedComponents.forComponents(components);
+
+BiMap outputMap =
+FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet());
+Map> outputCoders = Maps.newHashMap();
+for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) 
{
+  String collectionId = outputs.get(localOutputName);
+  Coder windowCoder = (Coder) 
instantiateCoder(collectionId, components);
+  outputCoders.put(localOutputName, windowCoder);
+}
+
+final RunnerApi.ExecutableStagePayload stagePayload;
+try {
+  stagePayload = 
RunnerApi.ExecutableStagePayload.parseFrom(transform.getSpec().getPayload());
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+
+String inputPCollectionId =
+Iterables.getOnlyElement(transform.getInputsMap().values());
 
 Review comment:
   As above, we need to pass a serializable representation to operators. We 
could create an ExecutableStage here and then reconstruct it on the runner. In 
that case, we need to ensure that two ExecutableStages constructed from the 
same ExecutableStagePayload are _always_ equivalent (including any synthetic 
ids that may be generated). That should be the case as of now, but we need to 
be careful.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 105233)
Time Spent: 1h 40m  (was: 1.5h)

> Flink portable runner executable stage operator for streaming
> -
>
> Key: BEAM-4297
> URL: https://issues.apache.org/jira/browse/BEAM-4297
> Project: Beam
>  Issue Type: Task
>  Components: runner-flink
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming

2018-05-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=105234=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105234
 ]

ASF GitHub Bot logged work on BEAM-4297:


Author: ASF GitHub Bot
Created on: 23/May/18 18:25
Start Date: 23/May/18 18:25
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5407: 
[BEAM-4297] Streaming executable stage translation and operator for portable 
Flink runner.
URL: https://github.com/apache/beam/pull/5407#discussion_r190351647
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 ##
 @@ -423,8 +432,133 @@ private void translateImpulse(
   String id,
   RunnerApi.Pipeline pipeline,
   StreamingTranslationContext context) {
+// TODO: Fail on stateful DoFns for now.
+// TODO: Support stateful DoFns by inserting group-by-keys where necessary.
+// TODO: Fail on splittable DoFns.
+// TODO: Special-case single outputs to avoid multiplexing PCollections.
+RunnerApi.Components components = pipeline.getComponents();
+RunnerApi.PTransform transform = components.getTransformsOrThrow(id);
+Map outputs = transform.getOutputsMap();
+RehydratedComponents rehydratedComponents =
+RehydratedComponents.forComponents(components);
+
+BiMap outputMap =
+FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet());
+Map> outputCoders = Maps.newHashMap();
+for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) 
{
+  String collectionId = outputs.get(localOutputName);
+  Coder windowCoder = (Coder) 
instantiateCoder(collectionId, components);
 
 Review comment:
   Unfortunately, we cannot simply pass the ExecutableStage onto operators 
here. Flink requires that operators be serializable. Operator constructors run 
on the client JVM, but operators are initialized via lifecycle methods on 
TaskManagers. For this reason, we use the executable stage _payload_ in the 
batch translator. The same applies here.
   
   We could decide to use a different serialized representation here for 
operator tasks, but it seems convenient here to reuse what we already have.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 105234)
Time Spent: 1h 50m  (was: 1h 40m)

> Flink portable runner executable stage operator for streaming
> -
>
> Key: BEAM-4297
> URL: https://issues.apache.org/jira/browse/BEAM-4297
> Project: Beam
>  Issue Type: Task
>  Components: runner-flink
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming

2018-05-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=105213=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105213
 ]

ASF GitHub Bot logged work on BEAM-4297:


Author: ASF GitHub Bot
Created on: 23/May/18 18:09
Start Date: 23/May/18 18:09
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5407: 
[BEAM-4297] Streaming executable stage translation and operator for portable 
Flink runner.
URL: https://github.com/apache/beam/pull/5407#discussion_r190338654
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 ##
 @@ -423,8 +432,133 @@ private void translateImpulse(
   String id,
   RunnerApi.Pipeline pipeline,
   StreamingTranslationContext context) {
+// TODO: Fail on stateful DoFns for now.
+// TODO: Support stateful DoFns by inserting group-by-keys where necessary.
+// TODO: Fail on splittable DoFns.
+// TODO: Special-case single outputs to avoid multiplexing PCollections.
+RunnerApi.Components components = pipeline.getComponents();
+RunnerApi.PTransform transform = components.getTransformsOrThrow(id);
+Map outputs = transform.getOutputsMap();
+RehydratedComponents rehydratedComponents =
+RehydratedComponents.forComponents(components);
+
+BiMap outputMap =
+FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet());
+Map> outputCoders = Maps.newHashMap();
+for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) 
{
+  String collectionId = outputs.get(localOutputName);
+  Coder windowCoder = (Coder) 
instantiateCoder(collectionId, components);
+  outputCoders.put(localOutputName, windowCoder);
+}
+
+final RunnerApi.ExecutableStagePayload stagePayload;
+try {
+  stagePayload = 
RunnerApi.ExecutableStagePayload.parseFrom(transform.getSpec().getPayload());
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+
+String inputPCollectionId =
+Iterables.getOnlyElement(transform.getInputsMap().values());
 
-throw new RuntimeException("executable stage translation not implemented");
+// TODO: is this still relevant?
+// we assume that the transformation does not change the windowing 
strategy.
+RunnerApi.WindowingStrategy windowingStrategyProto =
+pipeline.getComponents().getWindowingStrategiesOrThrow(
+pipeline.getComponents().getPcollectionsOrThrow(
 
 Review comment:
   You should rely on `getInputPCollection().pcollection()` from the 
`ExecutableStage` that you could construct above from the payload.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 105213)
Time Spent: 1h 20m  (was: 1h 10m)

> Flink portable runner executable stage operator for streaming
> -
>
> Key: BEAM-4297
> URL: https://issues.apache.org/jira/browse/BEAM-4297
> Project: Beam
>  Issue Type: Task
>  Components: runner-flink
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming

2018-05-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=105212=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105212
 ]

ASF GitHub Bot logged work on BEAM-4297:


Author: ASF GitHub Bot
Created on: 23/May/18 18:08
Start Date: 23/May/18 18:08
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5407: 
[BEAM-4297] Streaming executable stage translation and operator for portable 
Flink runner.
URL: https://github.com/apache/beam/pull/5407#discussion_r190347432
 
 

 ##
 File path: 
runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
 ##
 @@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.streaming;
+
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.protobuf.Struct;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
+import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
+import org.apache.beam.runners.flink.ArtifactSourcePool;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator;
+import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
+import org.apache.beam.runners.fnexecution.control.RemoteBundle;
+import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.OutputTag;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link ExecutableStageDoFnOperator}. */
+@RunWith(JUnit4.class)
+public class ExecutableStageDoFnOperatorTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Mock private RuntimeContext runtimeContext;
+  @Mock private DistributedCache distributedCache;
+  @Mock private FlinkExecutableStageContext stageContext;
+  @Mock private StageBundleFactory stageBundleFactory;
 
 Review comment:
   It might be easier to follow the testing strategy employed here: 
https://github.com/apache/beam/blob/8345991842dbc1f3cd8c2902ea8989bb0a8e81fb/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java#L159
   
   and use the InProcessSdkHarness TestRule to setup the tests instead of mocks.
   
   If not, can review the 

[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming

2018-05-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=105210=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105210
 ]

ASF GitHub Bot logged work on BEAM-4297:


Author: ASF GitHub Bot
Created on: 23/May/18 18:08
Start Date: 23/May/18 18:08
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5407: 
[BEAM-4297] Streaming executable stage translation and operator for portable 
Flink runner.
URL: https://github.com/apache/beam/pull/5407#discussion_r190335965
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 ##
 @@ -423,8 +432,133 @@ private void translateImpulse(
   String id,
   RunnerApi.Pipeline pipeline,
   StreamingTranslationContext context) {
+// TODO: Fail on stateful DoFns for now.
+// TODO: Support stateful DoFns by inserting group-by-keys where necessary.
+// TODO: Fail on splittable DoFns.
+// TODO: Special-case single outputs to avoid multiplexing PCollections.
+RunnerApi.Components components = pipeline.getComponents();
+RunnerApi.PTransform transform = components.getTransformsOrThrow(id);
+Map outputs = transform.getOutputsMap();
+RehydratedComponents rehydratedComponents =
+RehydratedComponents.forComponents(components);
+
+BiMap outputMap =
+FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet());
+Map> outputCoders = Maps.newHashMap();
+for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) 
{
 
 Review comment:
   consider using TreeSet


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 105210)

> Flink portable runner executable stage operator for streaming
> -
>
> Key: BEAM-4297
> URL: https://issues.apache.org/jira/browse/BEAM-4297
> Project: Beam
>  Issue Type: Task
>  Components: runner-flink
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 1h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming

2018-05-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=105211=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105211
 ]

ASF GitHub Bot logged work on BEAM-4297:


Author: ASF GitHub Bot
Created on: 23/May/18 18:08
Start Date: 23/May/18 18:08
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5407: 
[BEAM-4297] Streaming executable stage translation and operator for portable 
Flink runner.
URL: https://github.com/apache/beam/pull/5407#discussion_r190340103
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/FlinkPipelineTranslatorUtils.java
 ##
 @@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.utils;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+
+/**
+ * Utilities for pipeline translation.
+ */
+public final class FlinkPipelineTranslatorUtils {
+  private FlinkPipelineTranslatorUtils() {}
+
+  /**  Creates a mapping from PCollection id to output tag integer. */
+  public static BiMap createOutputMap(Iterable 
localOutputs) {
+ImmutableBiMap.Builder builder = ImmutableBiMap.builder();
+int outputIndex = 0;
+for (String tag : localOutputs) {
 
 Review comment:
   Sort the localOutputs to get a stable indexing otherwise multiple calls to 
createOutputMap won't be stable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 105211)
Time Spent: 1h 10m  (was: 1h)

> Flink portable runner executable stage operator for streaming
> -
>
> Key: BEAM-4297
> URL: https://issues.apache.org/jira/browse/BEAM-4297
> Project: Beam
>  Issue Type: Task
>  Components: runner-flink
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming

2018-05-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=105209=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105209
 ]

ASF GitHub Bot logged work on BEAM-4297:


Author: ASF GitHub Bot
Created on: 23/May/18 18:08
Start Date: 23/May/18 18:08
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5407: 
[BEAM-4297] Streaming executable stage translation and operator for portable 
Flink runner.
URL: https://github.com/apache/beam/pull/5407#discussion_r190338072
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 ##
 @@ -423,8 +432,133 @@ private void translateImpulse(
   String id,
   RunnerApi.Pipeline pipeline,
   StreamingTranslationContext context) {
+// TODO: Fail on stateful DoFns for now.
+// TODO: Support stateful DoFns by inserting group-by-keys where necessary.
+// TODO: Fail on splittable DoFns.
+// TODO: Special-case single outputs to avoid multiplexing PCollections.
+RunnerApi.Components components = pipeline.getComponents();
+RunnerApi.PTransform transform = components.getTransformsOrThrow(id);
+Map outputs = transform.getOutputsMap();
+RehydratedComponents rehydratedComponents =
+RehydratedComponents.forComponents(components);
+
+BiMap outputMap =
+FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet());
+Map> outputCoders = Maps.newHashMap();
+for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) 
{
+  String collectionId = outputs.get(localOutputName);
+  Coder windowCoder = (Coder) 
instantiateCoder(collectionId, components);
+  outputCoders.put(localOutputName, windowCoder);
+}
+
+final RunnerApi.ExecutableStagePayload stagePayload;
+try {
+  stagePayload = 
RunnerApi.ExecutableStagePayload.parseFrom(transform.getSpec().getPayload());
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+
+String inputPCollectionId =
+Iterables.getOnlyElement(transform.getInputsMap().values());
 
-throw new RuntimeException("executable stage translation not implemented");
+// TODO: is this still relevant?
+// we assume that the transformation does not change the windowing 
strategy.
+RunnerApi.WindowingStrategy windowingStrategyProto =
 
 Review comment:
   ExecutableStage's can change the windowing strategy as they may execute 
assign windows within. So a previous executable stage may have changed the 
windowing strategy.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 105209)

> Flink portable runner executable stage operator for streaming
> -
>
> Key: BEAM-4297
> URL: https://issues.apache.org/jira/browse/BEAM-4297
> Project: Beam
>  Issue Type: Task
>  Components: runner-flink
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 1h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming

2018-05-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=105208=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105208
 ]

ASF GitHub Bot logged work on BEAM-4297:


Author: ASF GitHub Bot
Created on: 23/May/18 18:08
Start Date: 23/May/18 18:08
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5407: 
[BEAM-4297] Streaming executable stage translation and operator for portable 
Flink runner.
URL: https://github.com/apache/beam/pull/5407#discussion_r190337604
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 ##
 @@ -423,8 +432,133 @@ private void translateImpulse(
   String id,
   RunnerApi.Pipeline pipeline,
   StreamingTranslationContext context) {
+// TODO: Fail on stateful DoFns for now.
+// TODO: Support stateful DoFns by inserting group-by-keys where necessary.
+// TODO: Fail on splittable DoFns.
+// TODO: Special-case single outputs to avoid multiplexing PCollections.
+RunnerApi.Components components = pipeline.getComponents();
+RunnerApi.PTransform transform = components.getTransformsOrThrow(id);
+Map outputs = transform.getOutputsMap();
+RehydratedComponents rehydratedComponents =
+RehydratedComponents.forComponents(components);
+
+BiMap outputMap =
+FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet());
+Map> outputCoders = Maps.newHashMap();
+for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) 
{
+  String collectionId = outputs.get(localOutputName);
+  Coder windowCoder = (Coder) 
instantiateCoder(collectionId, components);
+  outputCoders.put(localOutputName, windowCoder);
+}
+
+final RunnerApi.ExecutableStagePayload stagePayload;
+try {
+  stagePayload = 
RunnerApi.ExecutableStagePayload.parseFrom(transform.getSpec().getPayload());
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+
+String inputPCollectionId =
+Iterables.getOnlyElement(transform.getInputsMap().values());
 
 Review comment:
   Use ExecutableStage and rely on using `getInputPCollection().getId()` method.
   
   Ditto for updating this in the batch pipeline translator as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 105208)
Time Spent: 1h  (was: 50m)

> Flink portable runner executable stage operator for streaming
> -
>
> Key: BEAM-4297
> URL: https://issues.apache.org/jira/browse/BEAM-4297
> Project: Beam
>  Issue Type: Task
>  Components: runner-flink
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 1h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming

2018-05-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=105207=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105207
 ]

ASF GitHub Bot logged work on BEAM-4297:


Author: ASF GitHub Bot
Created on: 23/May/18 18:08
Start Date: 23/May/18 18:08
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5407: 
[BEAM-4297] Streaming executable stage translation and operator for portable 
Flink runner.
URL: https://github.com/apache/beam/pull/5407#discussion_r190339764
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 ##
 @@ -423,8 +432,133 @@ private void translateImpulse(
   String id,
   RunnerApi.Pipeline pipeline,
   StreamingTranslationContext context) {
+// TODO: Fail on stateful DoFns for now.
+// TODO: Support stateful DoFns by inserting group-by-keys where necessary.
+// TODO: Fail on splittable DoFns.
+// TODO: Special-case single outputs to avoid multiplexing PCollections.
+RunnerApi.Components components = pipeline.getComponents();
+RunnerApi.PTransform transform = components.getTransformsOrThrow(id);
+Map outputs = transform.getOutputsMap();
+RehydratedComponents rehydratedComponents =
+RehydratedComponents.forComponents(components);
+
+BiMap outputMap =
+FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet());
+Map> outputCoders = Maps.newHashMap();
+for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) 
{
+  String collectionId = outputs.get(localOutputName);
+  Coder windowCoder = (Coder) 
instantiateCoder(collectionId, components);
 
 Review comment:
   Rely on creating an `ExecutableStage` here and its `getOutputPCollections` 
method. You can pass forward this `ExecutableStage` to the 
`ExecutableStageDoFnOperator`. Anywhere below where you need to get the input 
pcollection (and in the future state/timer or side input information), you can 
get it from this object.
   
   I can see how having access to the coders would be important here and that 
the ExecutableStage should contain this information without needing to bind the 
data/state service that the ExecutableProcessBundleDescriptor does (currently 
that ExecutableProcessBundleDescriptor is munging the payload and making small 
albeit important modifications).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 105207)
Time Spent: 50m  (was: 40m)

> Flink portable runner executable stage operator for streaming
> -
>
> Key: BEAM-4297
> URL: https://issues.apache.org/jira/browse/BEAM-4297
> Project: Beam
>  Issue Type: Task
>  Components: runner-flink
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming

2018-05-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=105150=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105150
 ]

ASF GitHub Bot logged work on BEAM-4297:


Author: ASF GitHub Bot
Created on: 23/May/18 15:18
Start Date: 23/May/18 15:18
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #5407: [BEAM-4297] Streaming 
executable stage translation and operator for portable Flink runner.
URL: https://github.com/apache/beam/pull/5407#issuecomment-391386675
 
 
   @bsidhom @lukecwik PTAL


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 105150)
Time Spent: 40m  (was: 0.5h)

> Flink portable runner executable stage operator for streaming
> -
>
> Key: BEAM-4297
> URL: https://issues.apache.org/jira/browse/BEAM-4297
> Project: Beam
>  Issue Type: Task
>  Components: runner-flink
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming

2018-05-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=103615=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103615
 ]

ASF GitHub Bot logged work on BEAM-4297:


Author: ASF GitHub Bot
Created on: 19/May/18 00:49
Start Date: 19/May/18 00:49
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5407: 
[BEAM-4297] Streaming executable stage translation and operator for portable 
Flink runner.
URL: https://github.com/apache/beam/pull/5407#discussion_r189330462
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 ##
 @@ -616,7 +616,7 @@ private static void pruneOutput(
   }
 
   /**  Creates a mapping from PCollection id to output tag integer. */
-  private static BiMap createOutputMap(Iterable 
localOutputs) {
+  static BiMap createOutputMap(Iterable localOutputs) 
{
 
 Review comment:
   Please move this into a shared utility class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 103615)
Time Spent: 0.5h  (was: 20m)

> Flink portable runner executable stage operator for streaming
> -
>
> Key: BEAM-4297
> URL: https://issues.apache.org/jira/browse/BEAM-4297
> Project: Beam
>  Issue Type: Task
>  Components: runner-flink
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming

2018-05-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=103409=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103409
 ]

ASF GitHub Bot logged work on BEAM-4297:


Author: ASF GitHub Bot
Created on: 18/May/18 15:47
Start Date: 18/May/18 15:47
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #5407: [BEAM-4297] Streaming 
executable stage translation and operator for portable Flink runner.
URL: https://github.com/apache/beam/pull/5407#issuecomment-390249488
 
 
   R: @lukecwik @bsidhom 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 103409)
Time Spent: 20m  (was: 10m)

> Flink portable runner executable stage operator for streaming
> -
>
> Key: BEAM-4297
> URL: https://issues.apache.org/jira/browse/BEAM-4297
> Project: Beam
>  Issue Type: Task
>  Components: runner-flink
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming

2018-05-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=103209=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103209
 ]

ASF GitHub Bot logged work on BEAM-4297:


Author: ASF GitHub Bot
Created on: 18/May/18 02:37
Start Date: 18/May/18 02:37
Worklog Time Spent: 10m 
  Work Description: tweise opened a new pull request #5407: [BEAM-4297] 
Streaming executable stage translation and operator for portable Flink runner.
URL: https://github.com/apache/beam/pull/5407
 
 
   Executable stage translation for streaming mode based on the generic Flink 
streaming operator. Stage execution and tests adopted from batch translation.
   
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   It will help us expedite review of your Pull Request if you tag someone 
(e.g. `@username`) to look at it.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 103209)
Time Spent: 10m
Remaining Estimate: 0h

> Flink portable runner executable stage operator for streaming
> -
>
> Key: BEAM-4297
> URL: https://issues.apache.org/jira/browse/BEAM-4297
> Project: Beam
>  Issue Type: Task
>  Components: runner-flink
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)