[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming
[ https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=106770=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106770 ] ASF GitHub Bot logged work on BEAM-4297: Author: ASF GitHub Bot Created on: 29/May/18 19:02 Start Date: 29/May/18 19:02 Worklog Time Spent: 10m Work Description: tweise closed pull request #5407: [BEAM-4297] Streaming executable stage translation and operator for portable Flink runner. URL: https://github.com/apache/beam/pull/5407 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java index 72fe10e22c0..b7ee08378dc 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java @@ -20,7 +20,6 @@ import static com.google.common.base.Preconditions.checkArgument; import com.google.common.collect.BiMap; -import com.google.common.collect.ImmutableBiMap; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -56,6 +55,7 @@ import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.runners.flink.translation.types.KvKeySelector; +import org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils; import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat; import org.apache.beam.runners.fnexecution.provisioning.JobInfo; import org.apache.beam.runners.fnexecution.wire.WireCoders; @@ -319,7 +319,8 @@ public void translate(BatchTranslationContext context, RunnerApi.Pipeline pipeli RunnerApi.Components components = pipeline.getComponents(); Map outputs = transform.getTransform().getOutputsMap(); // Mapping from PCollection id to coder tag id. -BiMap outputMap = createOutputMap(outputs.values()); +BiMap outputMap = +FlinkPipelineTranslatorUtils.createOutputMap(outputs.values()); // Collect all output Coders and create a UnionCoder for our tagged outputs. List> unionCoders = Lists.newArrayList(); // Enforce tuple tag sorting by union tag index. @@ -615,14 +616,4 @@ private static void pruneOutput( context.addDataSet(collectionId, pruningOperator); } - /** Creates a mapping from PCollection id to output tag integer. */ - private static BiMap createOutputMap(Iterable localOutputs) { -ImmutableBiMap.Builder builder = ImmutableBiMap.builder(); -int outputIndex = 0; -for (String tag : localOutputs) { - builder.put(tag, outputIndex); - outputIndex++; -} -return builder.build(); - } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java index b7bc8ca1a40..ff991dce7ed 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java @@ -17,15 +17,20 @@ */ package org.apache.beam.runners.flink; +import com.google.common.collect.BiMap; import com.google.common.collect.HashMultiset; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.protobuf.InvalidProtocolBufferException; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.TreeMap; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.core.construction.PTransformTranslation; @@ -36,8 +41,11 @@ import org.apache.beam.runners.core.construction.graph.PipelineNode; import org.apache.beam.runners.core.construction.graph.QueryablePipeline; import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import
[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming
[ https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=106704=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106704 ] ASF GitHub Bot logged work on BEAM-4297: Author: ASF GitHub Bot Created on: 29/May/18 16:41 Start Date: 29/May/18 16:41 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5407: [BEAM-4297] Streaming executable stage translation and operator for portable Flink runner. URL: https://github.com/apache/beam/pull/5407#discussion_r191494018 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java ## @@ -423,8 +432,133 @@ private void translateImpulse( String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) { +// TODO: Fail on stateful DoFns for now. +// TODO: Support stateful DoFns by inserting group-by-keys where necessary. +// TODO: Fail on splittable DoFns. +// TODO: Special-case single outputs to avoid multiplexing PCollections. +RunnerApi.Components components = pipeline.getComponents(); +RunnerApi.PTransform transform = components.getTransformsOrThrow(id); +Map outputs = transform.getOutputsMap(); +RehydratedComponents rehydratedComponents = +RehydratedComponents.forComponents(components); + +BiMap outputMap = +FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet()); +Map>> outputCoders = Maps.newHashMap(); +for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) { + String collectionId = outputs.get(localOutputName); + Coder> windowCoder = (Coder) instantiateCoder(collectionId, components); + outputCoders.put(localOutputName, windowCoder); +} + +final RunnerApi.ExecutableStagePayload stagePayload; +try { + stagePayload = RunnerApi.ExecutableStagePayload.parseFrom(transform.getSpec().getPayload()); +} catch (IOException e) { + throw new RuntimeException(e); +} + +String inputPCollectionId = +Iterables.getOnlyElement(transform.getInputsMap().values()); Review comment: SGTM This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 106704) Time Spent: 3h 50m (was: 3h 40m) > Flink portable runner executable stage operator for streaming > - > > Key: BEAM-4297 > URL: https://issues.apache.org/jira/browse/BEAM-4297 > Project: Beam > Issue Type: Task > Components: runner-flink >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 3h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming
[ https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=106470=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106470 ] ASF GitHub Bot logged work on BEAM-4297: Author: ASF GitHub Bot Created on: 29/May/18 06:04 Start Date: 29/May/18 06:04 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #5407: [BEAM-4297] Streaming executable stage translation and operator for portable Flink runner. URL: https://github.com/apache/beam/pull/5407#discussion_r191311373 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java ## @@ -423,8 +432,133 @@ private void translateImpulse( String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) { +// TODO: Fail on stateful DoFns for now. +// TODO: Support stateful DoFns by inserting group-by-keys where necessary. +// TODO: Fail on splittable DoFns. +// TODO: Special-case single outputs to avoid multiplexing PCollections. +RunnerApi.Components components = pipeline.getComponents(); +RunnerApi.PTransform transform = components.getTransformsOrThrow(id); +Map outputs = transform.getOutputsMap(); +RehydratedComponents rehydratedComponents = +RehydratedComponents.forComponents(components); + +BiMap outputMap = +FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet()); +Map>> outputCoders = Maps.newHashMap(); +for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) { + String collectionId = outputs.get(localOutputName); + Coder> windowCoder = (Coder) instantiateCoder(collectionId, components); + outputCoders.put(localOutputName, windowCoder); +} + +final RunnerApi.ExecutableStagePayload stagePayload; +try { + stagePayload = RunnerApi.ExecutableStagePayload.parseFrom(transform.getSpec().getPayload()); +} catch (IOException e) { + throw new RuntimeException(e); +} + +String inputPCollectionId = +Iterables.getOnlyElement(transform.getInputsMap().values()); -throw new RuntimeException("executable stage translation not implemented"); +// TODO: is this still relevant? +// we assume that the transformation does not change the windowing strategy. +RunnerApi.WindowingStrategy windowingStrategyProto = Review comment: Removed, it was a remnant of translation code from old runner (only used in case of stateful ParDo). Since key handling and output encoding will happen in the SDK harness, we probably won't need to know the windowing strategy in the operator. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 106470) Time Spent: 3h 40m (was: 3.5h) > Flink portable runner executable stage operator for streaming > - > > Key: BEAM-4297 > URL: https://issues.apache.org/jira/browse/BEAM-4297 > Project: Beam > Issue Type: Task > Components: runner-flink >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 3h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming
[ https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=106467=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106467 ] ASF GitHub Bot logged work on BEAM-4297: Author: ASF GitHub Bot Created on: 29/May/18 05:36 Start Date: 29/May/18 05:36 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #5407: [BEAM-4297] Streaming executable stage translation and operator for portable Flink runner. URL: https://github.com/apache/beam/pull/5407#discussion_r191307968 ## File path: runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java ## @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.streaming; + +import static org.hamcrest.Matchers.is; +import static org.hamcrest.collection.IsIterableContainingInOrder.contains; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.Struct; +import java.util.Collections; +import java.util.List; +import org.apache.beam.model.pipeline.v1.RunnerApi.Components; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload; +import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection; +import org.apache.beam.runners.flink.ArtifactSourcePool; +import org.apache.beam.runners.flink.FlinkPipelineOptions; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext; +import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator; +import org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator; +import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory; +import org.apache.beam.runners.fnexecution.control.RemoteBundle; +import org.apache.beam.runners.fnexecution.control.StageBundleFactory; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.runners.fnexecution.state.StateRequestHandler; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.util.OutputTag; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +/** Tests for {@link ExecutableStageDoFnOperator}. */ +@RunWith(JUnit4.class) +public class ExecutableStageDoFnOperatorTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Mock private RuntimeContext runtimeContext; + @Mock private DistributedCache distributedCache; + @Mock private FlinkExecutableStageContext stageContext; + @Mock private StageBundleFactory stageBundleFactory; Review comment: I think the mock based test is good for covering just the operator class, without other dependencies. InProcessServerFactory might be a good way to write an integration test that also covers the translator, outside of the validate runner suite. I can probably do that as follow-up. This is an automated
[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming
[ https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=106463=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106463 ] ASF GitHub Bot logged work on BEAM-4297: Author: ASF GitHub Bot Created on: 29/May/18 05:14 Start Date: 29/May/18 05:14 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #5407: [BEAM-4297] Streaming executable stage translation and operator for portable Flink runner. URL: https://github.com/apache/beam/pull/5407#discussion_r191305096 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/FlinkPipelineTranslatorUtils.java ## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.utils; + +import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableBiMap; + +/** + * Utilities for pipeline translation. + */ +public final class FlinkPipelineTranslatorUtils { + private FlinkPipelineTranslatorUtils() {} + + /** Creates a mapping from PCollection id to output tag integer. */ + public static BiMap createOutputMap(Iterable localOutputs) { +ImmutableBiMap.Builder builder = ImmutableBiMap.builder(); +int outputIndex = 0; +for (String tag : localOutputs) { Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 106463) Time Spent: 3h 10m (was: 3h) > Flink portable runner executable stage operator for streaming > - > > Key: BEAM-4297 > URL: https://issues.apache.org/jira/browse/BEAM-4297 > Project: Beam > Issue Type: Task > Components: runner-flink >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 3h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming
[ https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=106464=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106464 ] ASF GitHub Bot logged work on BEAM-4297: Author: ASF GitHub Bot Created on: 29/May/18 05:14 Start Date: 29/May/18 05:14 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #5407: [BEAM-4297] Streaming executable stage translation and operator for portable Flink runner. URL: https://github.com/apache/beam/pull/5407#discussion_r191305612 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java ## @@ -423,8 +432,133 @@ private void translateImpulse( String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) { +// TODO: Fail on stateful DoFns for now. +// TODO: Support stateful DoFns by inserting group-by-keys where necessary. +// TODO: Fail on splittable DoFns. +// TODO: Special-case single outputs to avoid multiplexing PCollections. +RunnerApi.Components components = pipeline.getComponents(); +RunnerApi.PTransform transform = components.getTransformsOrThrow(id); +Map outputs = transform.getOutputsMap(); +RehydratedComponents rehydratedComponents = +RehydratedComponents.forComponents(components); + +BiMap outputMap = +FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet()); +Map>> outputCoders = Maps.newHashMap(); +for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) { + String collectionId = outputs.get(localOutputName); + Coder> windowCoder = (Coder) instantiateCoder(collectionId, components); + outputCoders.put(localOutputName, windowCoder); +} + +final RunnerApi.ExecutableStagePayload stagePayload; +try { + stagePayload = RunnerApi.ExecutableStagePayload.parseFrom(transform.getSpec().getPayload()); +} catch (IOException e) { + throw new RuntimeException(e); +} + +String inputPCollectionId = +Iterables.getOnlyElement(transform.getInputsMap().values()); Review comment: I added a test for serialization. If we agree on the repeated instantiation of ExecutableStage, then I can take this up in a separate PR (for both, batch and streaming translation). I would do that once we have test and end-to-end coverage, right now the translators are still not wired. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 106464) Time Spent: 3h 20m (was: 3h 10m) > Flink portable runner executable stage operator for streaming > - > > Key: BEAM-4297 > URL: https://issues.apache.org/jira/browse/BEAM-4297 > Project: Beam > Issue Type: Task > Components: runner-flink >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 3h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming
[ https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=106462=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106462 ] ASF GitHub Bot logged work on BEAM-4297: Author: ASF GitHub Bot Created on: 29/May/18 05:14 Start Date: 29/May/18 05:14 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #5407: [BEAM-4297] Streaming executable stage translation and operator for portable Flink runner. URL: https://github.com/apache/beam/pull/5407#discussion_r191305254 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java ## @@ -423,8 +432,133 @@ private void translateImpulse( String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) { +// TODO: Fail on stateful DoFns for now. +// TODO: Support stateful DoFns by inserting group-by-keys where necessary. +// TODO: Fail on splittable DoFns. +// TODO: Special-case single outputs to avoid multiplexing PCollections. +RunnerApi.Components components = pipeline.getComponents(); +RunnerApi.PTransform transform = components.getTransformsOrThrow(id); +Map outputs = transform.getOutputsMap(); +RehydratedComponents rehydratedComponents = +RehydratedComponents.forComponents(components); + +BiMap outputMap = +FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet()); +Map>> outputCoders = Maps.newHashMap(); +for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) { Review comment: I have cleaned up this portion of the code. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 106462) Time Spent: 3h 10m (was: 3h) > Flink portable runner executable stage operator for streaming > - > > Key: BEAM-4297 > URL: https://issues.apache.org/jira/browse/BEAM-4297 > Project: Beam > Issue Type: Task > Components: runner-flink >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 3h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming
[ https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=105301=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105301 ] ASF GitHub Bot logged work on BEAM-4297: Author: ASF GitHub Bot Created on: 23/May/18 20:00 Start Date: 23/May/18 20:00 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5407: [BEAM-4297] Streaming executable stage translation and operator for portable Flink runner. URL: https://github.com/apache/beam/pull/5407#discussion_r190380409 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java ## @@ -423,8 +432,133 @@ private void translateImpulse( String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) { +// TODO: Fail on stateful DoFns for now. +// TODO: Support stateful DoFns by inserting group-by-keys where necessary. +// TODO: Fail on splittable DoFns. +// TODO: Special-case single outputs to avoid multiplexing PCollections. +RunnerApi.Components components = pipeline.getComponents(); +RunnerApi.PTransform transform = components.getTransformsOrThrow(id); +Mapoutputs = transform.getOutputsMap(); +RehydratedComponents rehydratedComponents = +RehydratedComponents.forComponents(components); + +BiMap outputMap = +FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet()); +Map > outputCoders = Maps.newHashMap(); +for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) { + String collectionId = outputs.get(localOutputName); + Coder windowCoder = (Coder) instantiateCoder(collectionId, components); + outputCoders.put(localOutputName, windowCoder); +} + +final RunnerApi.ExecutableStagePayload stagePayload; +try { + stagePayload = RunnerApi.ExecutableStagePayload.parseFrom(transform.getSpec().getPayload()); +} catch (IOException e) { + throw new RuntimeException(e); +} + +String inputPCollectionId = +Iterables.getOnlyElement(transform.getInputsMap().values()); Review comment: I'm just arguing for pushing most of the manipulation done within ExecutableProcessBundleDescriptor into the ExecutableStage payload (minus the ApiServiceDescriptor binding) so it doesn't need modification. This would allow the ExecutableStage to concretely answer what are the input coders, output coders, side input coders, state coders, ... in addition to any other information. Longer term it seems if we had a way for the runner to say whether we need a keyed input context or grouped keyed output context makes sense as the runner could then say. These are the cases I know of: * KV for SplittableDoFn input, StatefulDoFn input, GBK input, Multimap side input materialization input, window mapping input * KV for GBK output Do you know of any others? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 105301) Time Spent: 2h 50m (was: 2h 40m) > Flink portable runner executable stage operator for streaming > - > > Key: BEAM-4297 > URL: https://issues.apache.org/jira/browse/BEAM-4297 > Project: Beam > Issue Type: Task > Components: runner-flink >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 2h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming
[ https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=105302=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105302 ] ASF GitHub Bot logged work on BEAM-4297: Author: ASF GitHub Bot Created on: 23/May/18 20:00 Start Date: 23/May/18 20:00 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5407: [BEAM-4297] Streaming executable stage translation and operator for portable Flink runner. URL: https://github.com/apache/beam/pull/5407#discussion_r190380409 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java ## @@ -423,8 +432,133 @@ private void translateImpulse( String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) { +// TODO: Fail on stateful DoFns for now. +// TODO: Support stateful DoFns by inserting group-by-keys where necessary. +// TODO: Fail on splittable DoFns. +// TODO: Special-case single outputs to avoid multiplexing PCollections. +RunnerApi.Components components = pipeline.getComponents(); +RunnerApi.PTransform transform = components.getTransformsOrThrow(id); +Mapoutputs = transform.getOutputsMap(); +RehydratedComponents rehydratedComponents = +RehydratedComponents.forComponents(components); + +BiMap outputMap = +FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet()); +Map > outputCoders = Maps.newHashMap(); +for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) { + String collectionId = outputs.get(localOutputName); + Coder windowCoder = (Coder) instantiateCoder(collectionId, components); + outputCoders.put(localOutputName, windowCoder); +} + +final RunnerApi.ExecutableStagePayload stagePayload; +try { + stagePayload = RunnerApi.ExecutableStagePayload.parseFrom(transform.getSpec().getPayload()); +} catch (IOException e) { + throw new RuntimeException(e); +} + +String inputPCollectionId = +Iterables.getOnlyElement(transform.getInputsMap().values()); Review comment: I'm just arguing for pushing most of the manipulation done within ExecutableProcessBundleDescriptor into the ExecutableStage payload (minus the ApiServiceDescriptor binding) so it doesn't need modification. This would allow the ExecutableStage to concretely answer what are the input coders, output coders, side input coders, state coders, ... in addition to any other information. Longer term it seems if we had a way for the runner to say whether we need a keyed input context or grouped keyed output context makes sense as the runner could then say. These are the cases I know of: * KV for SplittableDoFn input, StatefulDoFn input, GBK input, Multimap side input materialization input, window mapping input * KV for GBK output Do you know of any others? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 105302) Time Spent: 3h (was: 2h 50m) > Flink portable runner executable stage operator for streaming > - > > Key: BEAM-4297 > URL: https://issues.apache.org/jira/browse/BEAM-4297 > Project: Beam > Issue Type: Task > Components: runner-flink >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 3h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming
[ https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=105256=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105256 ] ASF GitHub Bot logged work on BEAM-4297: Author: ASF GitHub Bot Created on: 23/May/18 18:43 Start Date: 23/May/18 18:43 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5407: [BEAM-4297] Streaming executable stage translation and operator for portable Flink runner. URL: https://github.com/apache/beam/pull/5407#discussion_r190358677 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java ## @@ -423,8 +432,133 @@ private void translateImpulse( String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) { +// TODO: Fail on stateful DoFns for now. +// TODO: Support stateful DoFns by inserting group-by-keys where necessary. +// TODO: Fail on splittable DoFns. +// TODO: Special-case single outputs to avoid multiplexing PCollections. +RunnerApi.Components components = pipeline.getComponents(); +RunnerApi.PTransform transform = components.getTransformsOrThrow(id); +Mapoutputs = transform.getOutputsMap(); +RehydratedComponents rehydratedComponents = +RehydratedComponents.forComponents(components); + +BiMap outputMap = +FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet()); +Map > outputCoders = Maps.newHashMap(); +for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) { + String collectionId = outputs.get(localOutputName); + Coder windowCoder = (Coder) instantiateCoder(collectionId, components); + outputCoders.put(localOutputName, windowCoder); +} + +final RunnerApi.ExecutableStagePayload stagePayload; +try { + stagePayload = RunnerApi.ExecutableStagePayload.parseFrom(transform.getSpec().getPayload()); +} catch (IOException e) { + throw new RuntimeException(e); +} + +String inputPCollectionId = +Iterables.getOnlyElement(transform.getInputsMap().values()); Review comment: Unfortunately, Flink needs to have an associated serializer (TypeInformation, aka Coder) with each distributed collection. This TypeInformation needs to be known at pipeline construction time. It need not match the exact coder being used to materialize elements over gRPC, but it does need to match the in-memory element type. We could get around this partially by representing everything as bytes. The downside is that each runner-native operation that requires structure (e.g., GBK) will require an additional operation to break elements into their constituent parts. This step itself also requires knowledge of the coded type, so we ultimately run into the same issue. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 105256) Time Spent: 2h 40m (was: 2.5h) > Flink portable runner executable stage operator for streaming > - > > Key: BEAM-4297 > URL: https://issues.apache.org/jira/browse/BEAM-4297 > Project: Beam > Issue Type: Task > Components: runner-flink >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 2h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming
[ https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=105255=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105255 ] ASF GitHub Bot logged work on BEAM-4297: Author: ASF GitHub Bot Created on: 23/May/18 18:39 Start Date: 23/May/18 18:39 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5407: [BEAM-4297] Streaming executable stage translation and operator for portable Flink runner. URL: https://github.com/apache/beam/pull/5407#discussion_r190357323 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java ## @@ -423,8 +432,133 @@ private void translateImpulse( String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) { +// TODO: Fail on stateful DoFns for now. +// TODO: Support stateful DoFns by inserting group-by-keys where necessary. +// TODO: Fail on splittable DoFns. +// TODO: Special-case single outputs to avoid multiplexing PCollections. +RunnerApi.Components components = pipeline.getComponents(); +RunnerApi.PTransform transform = components.getTransformsOrThrow(id); +Mapoutputs = transform.getOutputsMap(); +RehydratedComponents rehydratedComponents = +RehydratedComponents.forComponents(components); + +BiMap outputMap = +FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet()); +Map > outputCoders = Maps.newHashMap(); +for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) { + String collectionId = outputs.get(localOutputName); + Coder windowCoder = (Coder) instantiateCoder(collectionId, components); Review comment: As commented above, we should be able to recreate the ExecutableStage multiple times and only bind the service (ApiServiceDescriptor) information should happen upstream. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 105255) Time Spent: 2.5h (was: 2h 20m) > Flink portable runner executable stage operator for streaming > - > > Key: BEAM-4297 > URL: https://issues.apache.org/jira/browse/BEAM-4297 > Project: Beam > Issue Type: Task > Components: runner-flink >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 2.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming
[ https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=105253=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105253 ] ASF GitHub Bot logged work on BEAM-4297: Author: ASF GitHub Bot Created on: 23/May/18 18:35 Start Date: 23/May/18 18:35 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5407: [BEAM-4297] Streaming executable stage translation and operator for portable Flink runner. URL: https://github.com/apache/beam/pull/5407#discussion_r190355574 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java ## @@ -423,8 +432,133 @@ private void translateImpulse( String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) { +// TODO: Fail on stateful DoFns for now. +// TODO: Support stateful DoFns by inserting group-by-keys where necessary. +// TODO: Fail on splittable DoFns. +// TODO: Special-case single outputs to avoid multiplexing PCollections. +RunnerApi.Components components = pipeline.getComponents(); +RunnerApi.PTransform transform = components.getTransformsOrThrow(id); +Mapoutputs = transform.getOutputsMap(); +RehydratedComponents rehydratedComponents = +RehydratedComponents.forComponents(components); + +BiMap outputMap = +FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet()); +Map > outputCoders = Maps.newHashMap(); +for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) { + String collectionId = outputs.get(localOutputName); + Coder windowCoder = (Coder) instantiateCoder(collectionId, components); + outputCoders.put(localOutputName, windowCoder); +} + +final RunnerApi.ExecutableStagePayload stagePayload; +try { + stagePayload = RunnerApi.ExecutableStagePayload.parseFrom(transform.getSpec().getPayload()); +} catch (IOException e) { + throw new RuntimeException(e); +} + +String inputPCollectionId = +Iterables.getOnlyElement(transform.getInputsMap().values()); Review comment: Unfortunately the way in which we construct coders and other properties of the ExecutableProcessBundleDescriptor are done during execution and it would be best if we somehow could make all these details be stable during pipeline translation so during execution it doesn't change. Having Flink rely on calling WireCoders.instantiateRunnerWireCoder(...) is an anti-pattern for encapsulation. So if we need to construct the executable stage payload twice, we could make the contract have it be stable regardless the number of times it is constructed. I just want to push more of the input/output/coder/state/side input information upto translation time instead of having it deep within execution. In my opinion, only service (ApiServiceDescriptor) binding should happen there. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 105253) Time Spent: 2h 20m (was: 2h 10m) > Flink portable runner executable stage operator for streaming > - > > Key: BEAM-4297 > URL: https://issues.apache.org/jira/browse/BEAM-4297 > Project: Beam > Issue Type: Task > Components: runner-flink >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 2h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming
[ https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=105250=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105250 ] ASF GitHub Bot logged work on BEAM-4297: Author: ASF GitHub Bot Created on: 23/May/18 18:33 Start Date: 23/May/18 18:33 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5407: [BEAM-4297] Streaming executable stage translation and operator for portable Flink runner. URL: https://github.com/apache/beam/pull/5407#discussion_r190355574 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java ## @@ -423,8 +432,133 @@ private void translateImpulse( String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) { +// TODO: Fail on stateful DoFns for now. +// TODO: Support stateful DoFns by inserting group-by-keys where necessary. +// TODO: Fail on splittable DoFns. +// TODO: Special-case single outputs to avoid multiplexing PCollections. +RunnerApi.Components components = pipeline.getComponents(); +RunnerApi.PTransform transform = components.getTransformsOrThrow(id); +Mapoutputs = transform.getOutputsMap(); +RehydratedComponents rehydratedComponents = +RehydratedComponents.forComponents(components); + +BiMap outputMap = +FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet()); +Map > outputCoders = Maps.newHashMap(); +for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) { + String collectionId = outputs.get(localOutputName); + Coder windowCoder = (Coder) instantiateCoder(collectionId, components); + outputCoders.put(localOutputName, windowCoder); +} + +final RunnerApi.ExecutableStagePayload stagePayload; +try { + stagePayload = RunnerApi.ExecutableStagePayload.parseFrom(transform.getSpec().getPayload()); +} catch (IOException e) { + throw new RuntimeException(e); +} + +String inputPCollectionId = +Iterables.getOnlyElement(transform.getInputsMap().values()); Review comment: Unfortunately the way in which we construct coders and other properties of the ExecutableProcessBundleDescriptor are done during execution and it would be best if we somehow could make all these details be stable during pipeline translation so during execution it doesn't change. Having Flink rely on calling WireCoders.instantiateRunnerWireCoder(...) is an anti-pattern for encapsulation. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 105250) Time Spent: 2h 10m (was: 2h) > Flink portable runner executable stage operator for streaming > - > > Key: BEAM-4297 > URL: https://issues.apache.org/jira/browse/BEAM-4297 > Project: Beam > Issue Type: Task > Components: runner-flink >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 2h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming
[ https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=105238=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105238 ] ASF GitHub Bot logged work on BEAM-4297: Author: ASF GitHub Bot Created on: 23/May/18 18:29 Start Date: 23/May/18 18:29 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5407: [BEAM-4297] Streaming executable stage translation and operator for portable Flink runner. URL: https://github.com/apache/beam/pull/5407#discussion_r190353926 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java ## @@ -423,8 +432,133 @@ private void translateImpulse( String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) { +// TODO: Fail on stateful DoFns for now. +// TODO: Support stateful DoFns by inserting group-by-keys where necessary. +// TODO: Fail on splittable DoFns. +// TODO: Special-case single outputs to avoid multiplexing PCollections. +RunnerApi.Components components = pipeline.getComponents(); +RunnerApi.PTransform transform = components.getTransformsOrThrow(id); +Mapoutputs = transform.getOutputsMap(); +RehydratedComponents rehydratedComponents = +RehydratedComponents.forComponents(components); + +BiMap outputMap = +FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet()); +Map > outputCoders = Maps.newHashMap(); +for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) { Review comment: I see, that makes sense. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 105238) Time Spent: 2h (was: 1h 50m) > Flink portable runner executable stage operator for streaming > - > > Key: BEAM-4297 > URL: https://issues.apache.org/jira/browse/BEAM-4297 > Project: Beam > Issue Type: Task > Components: runner-flink >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 2h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming
[ https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=105232=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105232 ] ASF GitHub Bot logged work on BEAM-4297: Author: ASF GitHub Bot Created on: 23/May/18 18:25 Start Date: 23/May/18 18:25 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5407: [BEAM-4297] Streaming executable stage translation and operator for portable Flink runner. URL: https://github.com/apache/beam/pull/5407#discussion_r190349666 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java ## @@ -423,8 +432,133 @@ private void translateImpulse( String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) { +// TODO: Fail on stateful DoFns for now. +// TODO: Support stateful DoFns by inserting group-by-keys where necessary. +// TODO: Fail on splittable DoFns. +// TODO: Special-case single outputs to avoid multiplexing PCollections. +RunnerApi.Components components = pipeline.getComponents(); +RunnerApi.PTransform transform = components.getTransformsOrThrow(id); +Mapoutputs = transform.getOutputsMap(); +RehydratedComponents rehydratedComponents = +RehydratedComponents.forComponents(components); + +BiMap outputMap = +FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet()); +Map > outputCoders = Maps.newHashMap(); +for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) { Review comment: A tree set of what? The intent here is to order the _coders_ (the values in the map) by their tags (the keys). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 105232) Time Spent: 1.5h (was: 1h 20m) > Flink portable runner executable stage operator for streaming > - > > Key: BEAM-4297 > URL: https://issues.apache.org/jira/browse/BEAM-4297 > Project: Beam > Issue Type: Task > Components: runner-flink >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 1.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming
[ https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=105233=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105233 ] ASF GitHub Bot logged work on BEAM-4297: Author: ASF GitHub Bot Created on: 23/May/18 18:25 Start Date: 23/May/18 18:25 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5407: [BEAM-4297] Streaming executable stage translation and operator for portable Flink runner. URL: https://github.com/apache/beam/pull/5407#discussion_r190352395 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java ## @@ -423,8 +432,133 @@ private void translateImpulse( String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) { +// TODO: Fail on stateful DoFns for now. +// TODO: Support stateful DoFns by inserting group-by-keys where necessary. +// TODO: Fail on splittable DoFns. +// TODO: Special-case single outputs to avoid multiplexing PCollections. +RunnerApi.Components components = pipeline.getComponents(); +RunnerApi.PTransform transform = components.getTransformsOrThrow(id); +Mapoutputs = transform.getOutputsMap(); +RehydratedComponents rehydratedComponents = +RehydratedComponents.forComponents(components); + +BiMap outputMap = +FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet()); +Map > outputCoders = Maps.newHashMap(); +for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) { + String collectionId = outputs.get(localOutputName); + Coder windowCoder = (Coder) instantiateCoder(collectionId, components); + outputCoders.put(localOutputName, windowCoder); +} + +final RunnerApi.ExecutableStagePayload stagePayload; +try { + stagePayload = RunnerApi.ExecutableStagePayload.parseFrom(transform.getSpec().getPayload()); +} catch (IOException e) { + throw new RuntimeException(e); +} + +String inputPCollectionId = +Iterables.getOnlyElement(transform.getInputsMap().values()); Review comment: As above, we need to pass a serializable representation to operators. We could create an ExecutableStage here and then reconstruct it on the runner. In that case, we need to ensure that two ExecutableStages constructed from the same ExecutableStagePayload are _always_ equivalent (including any synthetic ids that may be generated). That should be the case as of now, but we need to be careful. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 105233) Time Spent: 1h 40m (was: 1.5h) > Flink portable runner executable stage operator for streaming > - > > Key: BEAM-4297 > URL: https://issues.apache.org/jira/browse/BEAM-4297 > Project: Beam > Issue Type: Task > Components: runner-flink >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 1h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming
[ https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=105234=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105234 ] ASF GitHub Bot logged work on BEAM-4297: Author: ASF GitHub Bot Created on: 23/May/18 18:25 Start Date: 23/May/18 18:25 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5407: [BEAM-4297] Streaming executable stage translation and operator for portable Flink runner. URL: https://github.com/apache/beam/pull/5407#discussion_r190351647 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java ## @@ -423,8 +432,133 @@ private void translateImpulse( String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) { +// TODO: Fail on stateful DoFns for now. +// TODO: Support stateful DoFns by inserting group-by-keys where necessary. +// TODO: Fail on splittable DoFns. +// TODO: Special-case single outputs to avoid multiplexing PCollections. +RunnerApi.Components components = pipeline.getComponents(); +RunnerApi.PTransform transform = components.getTransformsOrThrow(id); +Mapoutputs = transform.getOutputsMap(); +RehydratedComponents rehydratedComponents = +RehydratedComponents.forComponents(components); + +BiMap outputMap = +FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet()); +Map > outputCoders = Maps.newHashMap(); +for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) { + String collectionId = outputs.get(localOutputName); + Coder windowCoder = (Coder) instantiateCoder(collectionId, components); Review comment: Unfortunately, we cannot simply pass the ExecutableStage onto operators here. Flink requires that operators be serializable. Operator constructors run on the client JVM, but operators are initialized via lifecycle methods on TaskManagers. For this reason, we use the executable stage _payload_ in the batch translator. The same applies here. We could decide to use a different serialized representation here for operator tasks, but it seems convenient here to reuse what we already have. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 105234) Time Spent: 1h 50m (was: 1h 40m) > Flink portable runner executable stage operator for streaming > - > > Key: BEAM-4297 > URL: https://issues.apache.org/jira/browse/BEAM-4297 > Project: Beam > Issue Type: Task > Components: runner-flink >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 1h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming
[ https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=105213=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105213 ] ASF GitHub Bot logged work on BEAM-4297: Author: ASF GitHub Bot Created on: 23/May/18 18:09 Start Date: 23/May/18 18:09 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5407: [BEAM-4297] Streaming executable stage translation and operator for portable Flink runner. URL: https://github.com/apache/beam/pull/5407#discussion_r190338654 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java ## @@ -423,8 +432,133 @@ private void translateImpulse( String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) { +// TODO: Fail on stateful DoFns for now. +// TODO: Support stateful DoFns by inserting group-by-keys where necessary. +// TODO: Fail on splittable DoFns. +// TODO: Special-case single outputs to avoid multiplexing PCollections. +RunnerApi.Components components = pipeline.getComponents(); +RunnerApi.PTransform transform = components.getTransformsOrThrow(id); +Mapoutputs = transform.getOutputsMap(); +RehydratedComponents rehydratedComponents = +RehydratedComponents.forComponents(components); + +BiMap outputMap = +FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet()); +Map > outputCoders = Maps.newHashMap(); +for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) { + String collectionId = outputs.get(localOutputName); + Coder windowCoder = (Coder) instantiateCoder(collectionId, components); + outputCoders.put(localOutputName, windowCoder); +} + +final RunnerApi.ExecutableStagePayload stagePayload; +try { + stagePayload = RunnerApi.ExecutableStagePayload.parseFrom(transform.getSpec().getPayload()); +} catch (IOException e) { + throw new RuntimeException(e); +} + +String inputPCollectionId = +Iterables.getOnlyElement(transform.getInputsMap().values()); -throw new RuntimeException("executable stage translation not implemented"); +// TODO: is this still relevant? +// we assume that the transformation does not change the windowing strategy. +RunnerApi.WindowingStrategy windowingStrategyProto = +pipeline.getComponents().getWindowingStrategiesOrThrow( +pipeline.getComponents().getPcollectionsOrThrow( Review comment: You should rely on `getInputPCollection().pcollection()` from the `ExecutableStage` that you could construct above from the payload. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 105213) Time Spent: 1h 20m (was: 1h 10m) > Flink portable runner executable stage operator for streaming > - > > Key: BEAM-4297 > URL: https://issues.apache.org/jira/browse/BEAM-4297 > Project: Beam > Issue Type: Task > Components: runner-flink >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 1h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming
[ https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=105212=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105212 ] ASF GitHub Bot logged work on BEAM-4297: Author: ASF GitHub Bot Created on: 23/May/18 18:08 Start Date: 23/May/18 18:08 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5407: [BEAM-4297] Streaming executable stage translation and operator for portable Flink runner. URL: https://github.com/apache/beam/pull/5407#discussion_r190347432 ## File path: runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java ## @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.streaming; + +import static org.hamcrest.Matchers.is; +import static org.hamcrest.collection.IsIterableContainingInOrder.contains; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.Struct; +import java.util.Collections; +import java.util.List; +import org.apache.beam.model.pipeline.v1.RunnerApi.Components; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload; +import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection; +import org.apache.beam.runners.flink.ArtifactSourcePool; +import org.apache.beam.runners.flink.FlinkPipelineOptions; +import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext; +import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator; +import org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator; +import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory; +import org.apache.beam.runners.fnexecution.control.RemoteBundle; +import org.apache.beam.runners.fnexecution.control.StageBundleFactory; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.runners.fnexecution.state.StateRequestHandler; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.util.OutputTag; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +/** Tests for {@link ExecutableStageDoFnOperator}. */ +@RunWith(JUnit4.class) +public class ExecutableStageDoFnOperatorTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Mock private RuntimeContext runtimeContext; + @Mock private DistributedCache distributedCache; + @Mock private FlinkExecutableStageContext stageContext; + @Mock private StageBundleFactory stageBundleFactory; Review comment: It might be easier to follow the testing strategy employed here: https://github.com/apache/beam/blob/8345991842dbc1f3cd8c2902ea8989bb0a8e81fb/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java#L159 and use the InProcessSdkHarness TestRule to setup the tests instead of mocks. If not, can review the
[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming
[ https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=105210=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105210 ] ASF GitHub Bot logged work on BEAM-4297: Author: ASF GitHub Bot Created on: 23/May/18 18:08 Start Date: 23/May/18 18:08 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5407: [BEAM-4297] Streaming executable stage translation and operator for portable Flink runner. URL: https://github.com/apache/beam/pull/5407#discussion_r190335965 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java ## @@ -423,8 +432,133 @@ private void translateImpulse( String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) { +// TODO: Fail on stateful DoFns for now. +// TODO: Support stateful DoFns by inserting group-by-keys where necessary. +// TODO: Fail on splittable DoFns. +// TODO: Special-case single outputs to avoid multiplexing PCollections. +RunnerApi.Components components = pipeline.getComponents(); +RunnerApi.PTransform transform = components.getTransformsOrThrow(id); +Mapoutputs = transform.getOutputsMap(); +RehydratedComponents rehydratedComponents = +RehydratedComponents.forComponents(components); + +BiMap outputMap = +FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet()); +Map > outputCoders = Maps.newHashMap(); +for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) { Review comment: consider using TreeSet This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 105210) > Flink portable runner executable stage operator for streaming > - > > Key: BEAM-4297 > URL: https://issues.apache.org/jira/browse/BEAM-4297 > Project: Beam > Issue Type: Task > Components: runner-flink >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 1h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming
[ https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=105211=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105211 ] ASF GitHub Bot logged work on BEAM-4297: Author: ASF GitHub Bot Created on: 23/May/18 18:08 Start Date: 23/May/18 18:08 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5407: [BEAM-4297] Streaming executable stage translation and operator for portable Flink runner. URL: https://github.com/apache/beam/pull/5407#discussion_r190340103 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/FlinkPipelineTranslatorUtils.java ## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.utils; + +import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableBiMap; + +/** + * Utilities for pipeline translation. + */ +public final class FlinkPipelineTranslatorUtils { + private FlinkPipelineTranslatorUtils() {} + + /** Creates a mapping from PCollection id to output tag integer. */ + public static BiMapcreateOutputMap(Iterable localOutputs) { +ImmutableBiMap.Builder builder = ImmutableBiMap.builder(); +int outputIndex = 0; +for (String tag : localOutputs) { Review comment: Sort the localOutputs to get a stable indexing otherwise multiple calls to createOutputMap won't be stable. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 105211) Time Spent: 1h 10m (was: 1h) > Flink portable runner executable stage operator for streaming > - > > Key: BEAM-4297 > URL: https://issues.apache.org/jira/browse/BEAM-4297 > Project: Beam > Issue Type: Task > Components: runner-flink >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 1h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming
[ https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=105209=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105209 ] ASF GitHub Bot logged work on BEAM-4297: Author: ASF GitHub Bot Created on: 23/May/18 18:08 Start Date: 23/May/18 18:08 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5407: [BEAM-4297] Streaming executable stage translation and operator for portable Flink runner. URL: https://github.com/apache/beam/pull/5407#discussion_r190338072 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java ## @@ -423,8 +432,133 @@ private void translateImpulse( String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) { +// TODO: Fail on stateful DoFns for now. +// TODO: Support stateful DoFns by inserting group-by-keys where necessary. +// TODO: Fail on splittable DoFns. +// TODO: Special-case single outputs to avoid multiplexing PCollections. +RunnerApi.Components components = pipeline.getComponents(); +RunnerApi.PTransform transform = components.getTransformsOrThrow(id); +Mapoutputs = transform.getOutputsMap(); +RehydratedComponents rehydratedComponents = +RehydratedComponents.forComponents(components); + +BiMap outputMap = +FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet()); +Map > outputCoders = Maps.newHashMap(); +for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) { + String collectionId = outputs.get(localOutputName); + Coder windowCoder = (Coder) instantiateCoder(collectionId, components); + outputCoders.put(localOutputName, windowCoder); +} + +final RunnerApi.ExecutableStagePayload stagePayload; +try { + stagePayload = RunnerApi.ExecutableStagePayload.parseFrom(transform.getSpec().getPayload()); +} catch (IOException e) { + throw new RuntimeException(e); +} + +String inputPCollectionId = +Iterables.getOnlyElement(transform.getInputsMap().values()); -throw new RuntimeException("executable stage translation not implemented"); +// TODO: is this still relevant? +// we assume that the transformation does not change the windowing strategy. +RunnerApi.WindowingStrategy windowingStrategyProto = Review comment: ExecutableStage's can change the windowing strategy as they may execute assign windows within. So a previous executable stage may have changed the windowing strategy. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 105209) > Flink portable runner executable stage operator for streaming > - > > Key: BEAM-4297 > URL: https://issues.apache.org/jira/browse/BEAM-4297 > Project: Beam > Issue Type: Task > Components: runner-flink >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 1h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming
[ https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=105208=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105208 ] ASF GitHub Bot logged work on BEAM-4297: Author: ASF GitHub Bot Created on: 23/May/18 18:08 Start Date: 23/May/18 18:08 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5407: [BEAM-4297] Streaming executable stage translation and operator for portable Flink runner. URL: https://github.com/apache/beam/pull/5407#discussion_r190337604 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java ## @@ -423,8 +432,133 @@ private void translateImpulse( String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) { +// TODO: Fail on stateful DoFns for now. +// TODO: Support stateful DoFns by inserting group-by-keys where necessary. +// TODO: Fail on splittable DoFns. +// TODO: Special-case single outputs to avoid multiplexing PCollections. +RunnerApi.Components components = pipeline.getComponents(); +RunnerApi.PTransform transform = components.getTransformsOrThrow(id); +Mapoutputs = transform.getOutputsMap(); +RehydratedComponents rehydratedComponents = +RehydratedComponents.forComponents(components); + +BiMap outputMap = +FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet()); +Map > outputCoders = Maps.newHashMap(); +for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) { + String collectionId = outputs.get(localOutputName); + Coder windowCoder = (Coder) instantiateCoder(collectionId, components); + outputCoders.put(localOutputName, windowCoder); +} + +final RunnerApi.ExecutableStagePayload stagePayload; +try { + stagePayload = RunnerApi.ExecutableStagePayload.parseFrom(transform.getSpec().getPayload()); +} catch (IOException e) { + throw new RuntimeException(e); +} + +String inputPCollectionId = +Iterables.getOnlyElement(transform.getInputsMap().values()); Review comment: Use ExecutableStage and rely on using `getInputPCollection().getId()` method. Ditto for updating this in the batch pipeline translator as well. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 105208) Time Spent: 1h (was: 50m) > Flink portable runner executable stage operator for streaming > - > > Key: BEAM-4297 > URL: https://issues.apache.org/jira/browse/BEAM-4297 > Project: Beam > Issue Type: Task > Components: runner-flink >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 1h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming
[ https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=105207=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105207 ] ASF GitHub Bot logged work on BEAM-4297: Author: ASF GitHub Bot Created on: 23/May/18 18:08 Start Date: 23/May/18 18:08 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5407: [BEAM-4297] Streaming executable stage translation and operator for portable Flink runner. URL: https://github.com/apache/beam/pull/5407#discussion_r190339764 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java ## @@ -423,8 +432,133 @@ private void translateImpulse( String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext context) { +// TODO: Fail on stateful DoFns for now. +// TODO: Support stateful DoFns by inserting group-by-keys where necessary. +// TODO: Fail on splittable DoFns. +// TODO: Special-case single outputs to avoid multiplexing PCollections. +RunnerApi.Components components = pipeline.getComponents(); +RunnerApi.PTransform transform = components.getTransformsOrThrow(id); +Mapoutputs = transform.getOutputsMap(); +RehydratedComponents rehydratedComponents = +RehydratedComponents.forComponents(components); + +BiMap outputMap = +FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet()); +Map > outputCoders = Maps.newHashMap(); +for (String localOutputName : new TreeMap<>(outputMap.inverse()).values()) { + String collectionId = outputs.get(localOutputName); + Coder windowCoder = (Coder) instantiateCoder(collectionId, components); Review comment: Rely on creating an `ExecutableStage` here and its `getOutputPCollections` method. You can pass forward this `ExecutableStage` to the `ExecutableStageDoFnOperator`. Anywhere below where you need to get the input pcollection (and in the future state/timer or side input information), you can get it from this object. I can see how having access to the coders would be important here and that the ExecutableStage should contain this information without needing to bind the data/state service that the ExecutableProcessBundleDescriptor does (currently that ExecutableProcessBundleDescriptor is munging the payload and making small albeit important modifications). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 105207) Time Spent: 50m (was: 40m) > Flink portable runner executable stage operator for streaming > - > > Key: BEAM-4297 > URL: https://issues.apache.org/jira/browse/BEAM-4297 > Project: Beam > Issue Type: Task > Components: runner-flink >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming
[ https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=105150=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105150 ] ASF GitHub Bot logged work on BEAM-4297: Author: ASF GitHub Bot Created on: 23/May/18 15:18 Start Date: 23/May/18 15:18 Worklog Time Spent: 10m Work Description: tweise commented on issue #5407: [BEAM-4297] Streaming executable stage translation and operator for portable Flink runner. URL: https://github.com/apache/beam/pull/5407#issuecomment-391386675 @bsidhom @lukecwik PTAL This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 105150) Time Spent: 40m (was: 0.5h) > Flink portable runner executable stage operator for streaming > - > > Key: BEAM-4297 > URL: https://issues.apache.org/jira/browse/BEAM-4297 > Project: Beam > Issue Type: Task > Components: runner-flink >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming
[ https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=103615=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103615 ] ASF GitHub Bot logged work on BEAM-4297: Author: ASF GitHub Bot Created on: 19/May/18 00:49 Start Date: 19/May/18 00:49 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5407: [BEAM-4297] Streaming executable stage translation and operator for portable Flink runner. URL: https://github.com/apache/beam/pull/5407#discussion_r189330462 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java ## @@ -616,7 +616,7 @@ private static void pruneOutput( } /** Creates a mapping from PCollection id to output tag integer. */ - private static BiMapcreateOutputMap(Iterable localOutputs) { + static BiMap createOutputMap(Iterable localOutputs) { Review comment: Please move this into a shared utility class. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 103615) Time Spent: 0.5h (was: 20m) > Flink portable runner executable stage operator for streaming > - > > Key: BEAM-4297 > URL: https://issues.apache.org/jira/browse/BEAM-4297 > Project: Beam > Issue Type: Task > Components: runner-flink >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 0.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming
[ https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=103409=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103409 ] ASF GitHub Bot logged work on BEAM-4297: Author: ASF GitHub Bot Created on: 18/May/18 15:47 Start Date: 18/May/18 15:47 Worklog Time Spent: 10m Work Description: tweise commented on issue #5407: [BEAM-4297] Streaming executable stage translation and operator for portable Flink runner. URL: https://github.com/apache/beam/pull/5407#issuecomment-390249488 R: @lukecwik @bsidhom This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 103409) Time Spent: 20m (was: 10m) > Flink portable runner executable stage operator for streaming > - > > Key: BEAM-4297 > URL: https://issues.apache.org/jira/browse/BEAM-4297 > Project: Beam > Issue Type: Task > Components: runner-flink >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4297) Flink portable runner executable stage operator for streaming
[ https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=103209=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103209 ] ASF GitHub Bot logged work on BEAM-4297: Author: ASF GitHub Bot Created on: 18/May/18 02:37 Start Date: 18/May/18 02:37 Worklog Time Spent: 10m Work Description: tweise opened a new pull request #5407: [BEAM-4297] Streaming executable stage translation and operator for portable Flink runner. URL: https://github.com/apache/beam/pull/5407 Executable stage translation for streaming mode based on the generic Flink streaming operator. Stage execution and tests adopted from batch translation. Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). It will help us expedite review of your Pull Request if you tag someone (e.g. `@username`) to look at it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 103209) Time Spent: 10m Remaining Estimate: 0h > Flink portable runner executable stage operator for streaming > - > > Key: BEAM-4297 > URL: https://issues.apache.org/jira/browse/BEAM-4297 > Project: Beam > Issue Type: Task > Components: runner-flink >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)