[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137935=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137935 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 24/Aug/18 18:58 Start Date: 24/Aug/18 18:58 Worklog Time Spent: 10m Work Description: tweise closed pull request #6208: [BEAM-2930] Side input support for Flink portable streaming. URL: https://github.com/apache/beam/pull/6208 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java index c488522aa64..c453329051d 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java @@ -34,22 +34,22 @@ import org.apache.beam.sdk.values.WindowingStrategy; /** A {@link PCollectionView} created from the components of a {@link SideInput}. */ -class RunnerPCollectionView extends PValueBase implements PCollectionView { +public class RunnerPCollectionView extends PValueBase implements PCollectionView { private final TupleTag>> tag; private final ViewFn>, T> viewFn; private final WindowMappingFn windowMappingFn; private final @Nullable WindowingStrategy windowingStrategy; - private final @Nullable Coder>> coder; - private final transient PCollection pCollection; + private final @Nullable Coder coder; + private final transient @Nullable PCollection pCollection; /** Create a new {@link RunnerPCollectionView} from the provided components. */ - RunnerPCollectionView( - PCollection pCollection, + public RunnerPCollectionView( + @Nullable PCollection pCollection, TupleTag>> tag, ViewFn>, T> viewFn, WindowMappingFn windowMappingFn, @Nullable WindowingStrategy windowingStrategy, - @Nullable Coder>> coder) { + @Nullable Coder coder) { this.pCollection = pCollection; this.tag = tag; this.viewFn = viewFn; @@ -84,7 +84,7 @@ } @Override - public Coder>> getCoderInternal() { + public Coder getCoderInternal() { return coder; } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java index e8c2b66ba42..1a6826d94b1 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java @@ -38,7 +38,6 @@ import org.apache.beam.sdk.transforms.ViewFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; /** @@ -148,30 +147,46 @@ public void addSideInputValue(PCollectionView sideInput, WindowedValue T get(PCollectionView view, BoundedWindow window) { -@SuppressWarnings("unchecked") -Coder windowCoder = -(Coder) view.getWindowingStrategyInternal().getWindowFn().windowCoder(); - -StateTag>> stateTag = sideInputContentsTags.get(view); - -ValueState> state = -stateInternals.state(StateNamespaces.window(windowCoder, window), stateTag); -// TODO: Add support for choosing which representation is contained based upon the -// side input materialization. We currently can assume that we always have a multimap -// materialization as that is the only supported type within the Java SDK. -@Nullable Iterable> elements = (Iterable>) state.read(); +@Nullable Iterable elements = getIterable(view, window); if (elements == null) { elements = Collections.emptyList(); } +// TODO: Add support for choosing which representation is contained based upon the +// side input materialization. We currently can assume that we always have a multimap +// materialization as that is the only supported type within the Java SDK. ViewFn viewFn = (ViewFn) view.getViewFn(); Coder keyCoder = ((KvCoder) view.getCoderInternal()).getKeyCoder(); return (T) viewFn.apply(InMemoryMultimapSideInputView.fromIterable(keyCoder, (Iterable) elements)); } + /** + * Retrieve the value as written by {@link #addSideInputValue(PCollectionView, WindowedValue)}, + * without applying the SDK specific
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137922=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137922 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 24/Aug/18 18:24 Start Date: 24/Aug/18 18:24 Worklog Time Spent: 10m Work Description: tweise commented on issue #6208: [BEAM-2930] Side input support for Flink portable streaming. URL: https://github.com/apache/beam/pull/6208#issuecomment-415842617 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 137922) Time Spent: 7h 10m (was: 7h) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 7h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137868=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137868 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 24/Aug/18 15:58 Start Date: 24/Aug/18 15:58 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #6208: [BEAM-2930] Side input support for Flink portable streaming. URL: https://github.com/apache/beam/pull/6208#discussion_r212675673 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java ## @@ -149,6 +149,7 @@ public static FlinkBatchPortablePipelineTranslator createTranslator() { translatorMap.put( PTransformTranslation.RESHUFFLE_URN, FlinkBatchPortablePipelineTranslator::translateReshuffle); +// TODO: this does not seem required translatorMap.put( PTransformTranslation.CREATE_VIEW_TRANSFORM_URN, FlinkBatchPortablePipelineTranslator::translateView); Review comment: Dead code, removed it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 137868) Time Spent: 7h (was: 6h 50m) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 7h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137867=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137867 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 24/Aug/18 15:58 Start Date: 24/Aug/18 15:58 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #6208: [BEAM-2930] Side input support for Flink portable streaming. URL: https://github.com/apache/beam/pull/6208#discussion_r212675593 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStreamingSideInputHandlerFactory.java ## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.functions; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.collect.ImmutableMap; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Map; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.core.construction.graph.SideInputReference; +import org.apache.beam.runners.fnexecution.state.StateRequestHandler; +import org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputHandler; +import org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputHandlerFactory; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.Materializations; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * {@link StateRequestHandler} that uses {@link org.apache.beam.runners.core.SideInputHandler} to + * access the Flink broadcast state that represents side inputs. + */ +public class FlinkStreamingSideInputHandlerFactory implements SideInputHandlerFactory { + + // Map from side input id to global PCollection id. + private final Map> sideInputToCollection; + private final org.apache.beam.runners.core.SideInputHandler runnerHandler; + + /** + * Creates a new state handler for the given stage. Note that this requires a traversal of the + * stage itself, so this should only be called once per stage rather than once per bundle. + */ + public static FlinkStreamingSideInputHandlerFactory forStage( + ExecutableStage stage, + Map> viewMapping, + org.apache.beam.runners.core.SideInputHandler runnerHandler) { +ImmutableMap.Builder> sideInputBuilder = ImmutableMap.builder(); +for (SideInputReference sideInput : stage.getSideInputs()) { + SideInputId sideInputId = + SideInputId.newBuilder() + .setTransformId(sideInput.transform().getId()) + .setLocalName(sideInput.localName()) + .build(); + sideInputBuilder.put( + sideInputId, + checkNotNull( + viewMapping.get(sideInputId), + "No side input for %s/%s", + sideInputId.getTransformId(), + sideInputId.getLocalName())); +} + +FlinkStreamingSideInputHandlerFactory factory = +new FlinkStreamingSideInputHandlerFactory(sideInputBuilder.build(), runnerHandler); +return factory; + } + + private FlinkStreamingSideInputHandlerFactory( + Map> sideInputToCollection, + org.apache.beam.runners.core.SideInputHandler runnerHandler) { +this.sideInputToCollection = sideInputToCollection; +this.runnerHandler = runnerHandler; + } + + @Override + public SideInputHandler forSideInput( + String transformId, + String sideInputId, + RunnerApi.FunctionSpec accessPattern, + Coder elementCoder, +
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137865=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137865 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 24/Aug/18 15:57 Start Date: 24/Aug/18 15:57 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #6208: [BEAM-2930] Side input support for Flink portable streaming. URL: https://github.com/apache/beam/pull/6208#discussion_r212675392 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java ## @@ -148,30 +147,45 @@ public void addSideInputValue(PCollectionView sideInput, WindowedValue T get(PCollectionView view, BoundedWindow window) { -@SuppressWarnings("unchecked") -Coder windowCoder = -(Coder) view.getWindowingStrategyInternal().getWindowFn().windowCoder(); - -StateTag>> stateTag = sideInputContentsTags.get(view); - -ValueState> state = -stateInternals.state(StateNamespaces.window(windowCoder, window), stateTag); -// TODO: Add support for choosing which representation is contained based upon the -// side input materialization. We currently can assume that we always have a multimap -// materialization as that is the only supported type within the Java SDK. -@Nullable Iterable> elements = (Iterable>) state.read(); +@Nullable Iterable elements = getIterable(view, window); if (elements == null) { elements = Collections.emptyList(); } Review comment: Not needed since side input isn't supposed to be accessed before the window is ready. Added extra check in the handler and comment. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 137865) Time Spent: 6.5h (was: 6h 20m) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 6.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137863=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137863 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 24/Aug/18 15:57 Start Date: 24/Aug/18 15:57 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #6208: [BEAM-2930] Side input support for Flink portable streaming. URL: https://github.com/apache/beam/pull/6208#discussion_r212675392 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java ## @@ -148,30 +147,45 @@ public void addSideInputValue(PCollectionView sideInput, WindowedValue T get(PCollectionView view, BoundedWindow window) { -@SuppressWarnings("unchecked") -Coder windowCoder = -(Coder) view.getWindowingStrategyInternal().getWindowFn().windowCoder(); - -StateTag>> stateTag = sideInputContentsTags.get(view); - -ValueState> state = -stateInternals.state(StateNamespaces.window(windowCoder, window), stateTag); -// TODO: Add support for choosing which representation is contained based upon the -// side input materialization. We currently can assume that we always have a multimap -// materialization as that is the only supported type within the Java SDK. -@Nullable Iterable> elements = (Iterable>) state.read(); +@Nullable Iterable elements = getIterable(view, window); if (elements == null) { elements = Collections.emptyList(); } Review comment: Not needed since side input isn't supposed to be accessed before the window is ready. Added extra check. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 137863) Time Spent: 6h 20m (was: 6h 10m) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 6h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137866=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137866 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 24/Aug/18 15:57 Start Date: 24/Aug/18 15:57 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #6208: [BEAM-2930] Side input support for Flink portable streaming. URL: https://github.com/apache/beam/pull/6208#discussion_r21267 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java ## @@ -108,27 +119,45 @@ public ExecutableStageDoFnOperator( this.jobInfo = jobInfo; this.contextFactory = contextFactory; this.outputMap = outputMap; +this.sideInputIds = sideInputIds; } @Override public void open() throws Exception { super.open(); -ExecutableStage executableStage = ExecutableStage.fromPayload(payload); +executableStage = ExecutableStage.fromPayload(payload); // TODO: Wire this into the distributed cache and make it pluggable. // TODO: Do we really want this layer of indirection when accessing the stage bundle factory? // It's a little strange because this operator is responsible for the lifetime of the stage // bundle "factory" (manager?) but not the job or Flink bundle factories. How do we make // ownership of the higher level "factories" explicit? Do we care? stageContext = contextFactory.get(jobInfo); -// NOTE: It's safe to reuse the state handler between partitions because each partition uses the -// same backing runtime context and broadcast variables. We use checkState below to catch errors -// in backward-incompatible Flink changes. -stateRequestHandler = stageContext.getStateRequestHandler(executableStage, getRuntimeContext()); + +stateRequestHandler = getStateRequestHandler(executableStage); stageBundleFactory = stageContext.getStageBundleFactory(executableStage); progressHandler = BundleProgressHandler.unsupported(); } + private StateRequestHandler getStateRequestHandler(ExecutableStage executableStage) { + +if (!executableStage.getSideInputs().isEmpty()) { Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 137866) Time Spent: 6h 40m (was: 6.5h) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 6h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137862=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137862 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 24/Aug/18 15:56 Start Date: 24/Aug/18 15:56 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #6208: [BEAM-2930] Side input support for Flink portable streaming. URL: https://github.com/apache/beam/pull/6208#discussion_r212675152 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java ## @@ -148,30 +147,45 @@ public void addSideInputValue(PCollectionView sideInput, WindowedValue T get(PCollectionView view, BoundedWindow window) { -@SuppressWarnings("unchecked") -Coder windowCoder = -(Coder) view.getWindowingStrategyInternal().getWindowFn().windowCoder(); - -StateTag>> stateTag = sideInputContentsTags.get(view); - -ValueState> state = -stateInternals.state(StateNamespaces.window(windowCoder, window), stateTag); -// TODO: Add support for choosing which representation is contained based upon the -// side input materialization. We currently can assume that we always have a multimap -// materialization as that is the only supported type within the Java SDK. -@Nullable Iterable> elements = (Iterable>) state.read(); +@Nullable Iterable elements = getIterable(view, window); if (elements == null) { elements = Collections.emptyList(); } +// TODO: Add support for choosing which representation is contained based upon the +// side input materialization. We currently can assume that we always have a multimap +// materialization as that is the only supported type within the Java SDK. ViewFn viewFn = (ViewFn) view.getViewFn(); Review comment: Left as is, this was just moved around. It is an SDK specific assumption, portable runner does not use this code path. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 137862) Time Spent: 6h 10m (was: 6h) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 6h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137861=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137861 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 24/Aug/18 15:55 Start Date: 24/Aug/18 15:55 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #6208: [BEAM-2930] Side input support for Flink portable streaming. URL: https://github.com/apache/beam/pull/6208#discussion_r212674746 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java ## @@ -520,6 +567,158 @@ private void translateImpulse( } } + private static LinkedHashMap> + getSideInputIdToPCollectionViewMap( + RunnerApi.ExecutableStagePayload stagePayload, RunnerApi.Components components) { + +RehydratedComponents rehydratedComponents = RehydratedComponents.forComponents(components); + +LinkedHashMap> sideInputs = +new LinkedHashMap<>(); +for (RunnerApi.ExecutableStagePayload.SideInputId sideInputId : +stagePayload.getSideInputsList()) { + + // TODO: local name is unique as long as only one transform with side input can be within a stage + String sideInputTag = sideInputId.getLocalName(); + // for PCollectionView compatibility, not used to transform materialization + ViewFn>, ?> viewFn = + (ViewFn) new PCollectionViews.MultimapViewFn>, Void>(); Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 137861) Time Spent: 6h (was: 5h 50m) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 6h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137860=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137860 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 24/Aug/18 15:55 Start Date: 24/Aug/18 15:55 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #6208: [BEAM-2930] Side input support for Flink portable streaming. URL: https://github.com/apache/beam/pull/6208#discussion_r212674697 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java ## @@ -520,6 +567,158 @@ private void translateImpulse( } } + private static LinkedHashMap> + getSideInputIdToPCollectionViewMap( + RunnerApi.ExecutableStagePayload stagePayload, RunnerApi.Components components) { + +RehydratedComponents rehydratedComponents = RehydratedComponents.forComponents(components); + +LinkedHashMap> sideInputs = +new LinkedHashMap<>(); +for (RunnerApi.ExecutableStagePayload.SideInputId sideInputId : +stagePayload.getSideInputsList()) { + + // TODO: local name is unique as long as only one transform with side input can be within a stage + String sideInputTag = sideInputId.getLocalName(); + // for PCollectionView compatibility, not used to transform materialization + ViewFn>, ?> viewFn = + (ViewFn) new PCollectionViews.MultimapViewFn>, Void>(); + + String collectionId = + components + .getTransformsOrThrow(sideInputId.getTransformId()) + .getInputsOrThrow(sideInputId.getLocalName()); + RunnerApi.WindowingStrategy windowingStrategyProto = + components.getWindowingStrategiesOrThrow( + components.getPcollectionsOrThrow(collectionId).getWindowingStrategyId()); + + final WindowingStrategy windowingStrategy; + try { +windowingStrategy = +WindowingStrategyTranslation.fromProto(windowingStrategyProto, rehydratedComponents); + } catch (InvalidProtocolBufferException e) { +throw new IllegalStateException( +String.format( +"Unable to hydrate side input windowing strategy %s.", windowingStrategyProto), +e); + } + + Coder> coder = instantiateCoder(collectionId, components); + // side input materialization via GBK (T -> Iterable) + WindowedValueCoder wvCoder = (WindowedValueCoder) coder; + coder = wvCoder.withValueCoder(IterableCoder.of(wvCoder.getValueCoder())); + + sideInputs.put( + sideInputId, + new RunnerPCollectionView<>( + null, + new TupleTag<>(sideInputTag), + viewFn, + // TODO: support custom mapping fn + windowingStrategy.getWindowFn().getDefaultWindowMappingFn(), + windowingStrategy, + coder)); +} +return sideInputs; + } + + private Tuple2>, DataStream> transformSideInputs( + RunnerApi.ExecutableStagePayload stagePayload, + RunnerApi.Components components, + StreamingTranslationContext context) { + +LinkedHashMap> sideInputs = +getSideInputIdToPCollectionViewMap(stagePayload, components); + +Map, Integer> tagToIntMapping = new HashMap<>(); +Map> intToViewMapping = new HashMap<>(); +List>> kvCoders = new ArrayList<>(); +List> viewCoders = new ArrayList<>(); + +int count = 0; +for (Map.Entry> sideInput : +sideInputs.entrySet()) { + TupleTag tag = sideInput.getValue().getTagInternal(); + intToViewMapping.put(count, sideInput.getValue()); + tagToIntMapping.put(tag, count); + count++; + String collectionId = + components + .getTransformsOrThrow(sideInput.getKey().getTransformId()) + .getInputsOrThrow(sideInput.getKey().getLocalName()); + DataStream sideInputStream = context.getDataStreamOrThrow(collectionId); + TypeInformation tpe = sideInputStream.getType(); + if (!(tpe instanceof CoderTypeInformation)) { +throw new IllegalStateException("Input Stream TypeInformation is no CoderTypeInformation."); + } + + WindowedValueCoder coder = + (WindowedValueCoder) ((CoderTypeInformation) tpe).getCoder(); + Coder> kvCoder = KvCoder.of(VoidCoder.of(), coder.getValueCoder()); + kvCoders.add(coder.withValueCoder(kvCoder)); + // coder for materialized view matching GBK below + WindowedValueCoder>> viewCoder = + coder.withValueCoder(KvCoder.of(VoidCoder.of(), IterableCoder.of(coder.getValueCoder(; + viewCoders.add(viewCoder); +} + +// second pass, now that we gathered the input coders +UnionCoder unionCoder = UnionCoder.of(viewCoders); + +CoderTypeInformation
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137859=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137859 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 24/Aug/18 15:54 Start Date: 24/Aug/18 15:54 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #6208: [BEAM-2930] Side input support for Flink portable streaming. URL: https://github.com/apache/beam/pull/6208#discussion_r212674635 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStreamingExecutableStageContext.java ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.functions; + +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.fnexecution.control.DockerJobBundleFactory; +import org.apache.beam.runners.fnexecution.control.JobBundleFactory; +import org.apache.beam.runners.fnexecution.control.StageBundleFactory; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; + +/** Implementation of a {@link FlinkExecutableStageContext} for streaming. */ +public class FlinkStreamingExecutableStageContext Review comment: None, removed it. (It was necessary before when it was entangled with the state request handler.) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 137859) Time Spent: 5h 40m (was: 5.5h) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 5h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137805=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137805 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 24/Aug/18 13:21 Start Date: 24/Aug/18 13:21 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6208: [BEAM-2930] Side input support for Flink portable streaming. URL: https://github.com/apache/beam/pull/6208#discussion_r212340352 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java ## @@ -108,27 +119,45 @@ public ExecutableStageDoFnOperator( this.jobInfo = jobInfo; this.contextFactory = contextFactory; this.outputMap = outputMap; +this.sideInputIds = sideInputIds; } @Override public void open() throws Exception { super.open(); -ExecutableStage executableStage = ExecutableStage.fromPayload(payload); +executableStage = ExecutableStage.fromPayload(payload); // TODO: Wire this into the distributed cache and make it pluggable. // TODO: Do we really want this layer of indirection when accessing the stage bundle factory? // It's a little strange because this operator is responsible for the lifetime of the stage // bundle "factory" (manager?) but not the job or Flink bundle factories. How do we make // ownership of the higher level "factories" explicit? Do we care? stageContext = contextFactory.get(jobInfo); -// NOTE: It's safe to reuse the state handler between partitions because each partition uses the -// same backing runtime context and broadcast variables. We use checkState below to catch errors -// in backward-incompatible Flink changes. -stateRequestHandler = stageContext.getStateRequestHandler(executableStage, getRuntimeContext()); + +stateRequestHandler = getStateRequestHandler(executableStage); stageBundleFactory = stageContext.getStageBundleFactory(executableStage); progressHandler = BundleProgressHandler.unsupported(); } + private StateRequestHandler getStateRequestHandler(ExecutableStage executableStage) { + +if (!executableStage.getSideInputs().isEmpty()) { Review comment: Not important but I prefer: `if (executableStage.getSideInputs() > 0) {` or switch around the if/else blocks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 137805) Time Spent: 5h 20m (was: 5h 10m) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 5h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137808=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137808 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 24/Aug/18 13:21 Start Date: 24/Aug/18 13:21 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6208: [BEAM-2930] Side input support for Flink portable streaming. URL: https://github.com/apache/beam/pull/6208#discussion_r212585550 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java ## @@ -520,6 +567,158 @@ private void translateImpulse( } } + private static LinkedHashMap> + getSideInputIdToPCollectionViewMap( + RunnerApi.ExecutableStagePayload stagePayload, RunnerApi.Components components) { + +RehydratedComponents rehydratedComponents = RehydratedComponents.forComponents(components); + +LinkedHashMap> sideInputs = +new LinkedHashMap<>(); +for (RunnerApi.ExecutableStagePayload.SideInputId sideInputId : +stagePayload.getSideInputsList()) { + + // TODO: local name is unique as long as only one transform with side input can be within a stage + String sideInputTag = sideInputId.getLocalName(); + // for PCollectionView compatibility, not used to transform materialization + ViewFn>, ?> viewFn = + (ViewFn) new PCollectionViews.MultimapViewFn>, Void>(); + + String collectionId = + components + .getTransformsOrThrow(sideInputId.getTransformId()) + .getInputsOrThrow(sideInputId.getLocalName()); + RunnerApi.WindowingStrategy windowingStrategyProto = + components.getWindowingStrategiesOrThrow( + components.getPcollectionsOrThrow(collectionId).getWindowingStrategyId()); + + final WindowingStrategy windowingStrategy; + try { +windowingStrategy = +WindowingStrategyTranslation.fromProto(windowingStrategyProto, rehydratedComponents); + } catch (InvalidProtocolBufferException e) { +throw new IllegalStateException( +String.format( +"Unable to hydrate side input windowing strategy %s.", windowingStrategyProto), +e); + } + + Coder> coder = instantiateCoder(collectionId, components); + // side input materialization via GBK (T -> Iterable) + WindowedValueCoder wvCoder = (WindowedValueCoder) coder; + coder = wvCoder.withValueCoder(IterableCoder.of(wvCoder.getValueCoder())); + + sideInputs.put( + sideInputId, + new RunnerPCollectionView<>( + null, + new TupleTag<>(sideInputTag), + viewFn, + // TODO: support custom mapping fn + windowingStrategy.getWindowFn().getDefaultWindowMappingFn(), + windowingStrategy, + coder)); +} +return sideInputs; + } + + private Tuple2>, DataStream> transformSideInputs( + RunnerApi.ExecutableStagePayload stagePayload, + RunnerApi.Components components, + StreamingTranslationContext context) { + +LinkedHashMap> sideInputs = +getSideInputIdToPCollectionViewMap(stagePayload, components); + +Map, Integer> tagToIntMapping = new HashMap<>(); +Map> intToViewMapping = new HashMap<>(); +List>> kvCoders = new ArrayList<>(); +List> viewCoders = new ArrayList<>(); + +int count = 0; +for (Map.Entry> sideInput : +sideInputs.entrySet()) { + TupleTag tag = sideInput.getValue().getTagInternal(); + intToViewMapping.put(count, sideInput.getValue()); + tagToIntMapping.put(tag, count); + count++; + String collectionId = + components + .getTransformsOrThrow(sideInput.getKey().getTransformId()) + .getInputsOrThrow(sideInput.getKey().getLocalName()); + DataStream sideInputStream = context.getDataStreamOrThrow(collectionId); + TypeInformation tpe = sideInputStream.getType(); + if (!(tpe instanceof CoderTypeInformation)) { +throw new IllegalStateException("Input Stream TypeInformation is no CoderTypeInformation."); + } + + WindowedValueCoder coder = + (WindowedValueCoder) ((CoderTypeInformation) tpe).getCoder(); + Coder> kvCoder = KvCoder.of(VoidCoder.of(), coder.getValueCoder()); + kvCoders.add(coder.withValueCoder(kvCoder)); + // coder for materialized view matching GBK below + WindowedValueCoder>> viewCoder = + coder.withValueCoder(KvCoder.of(VoidCoder.of(), IterableCoder.of(coder.getValueCoder(; + viewCoders.add(viewCoder); +} + +// second pass, now that we gathered the input coders +UnionCoder unionCoder = UnionCoder.of(viewCoders); + +CoderTypeInformation
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137804=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137804 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 24/Aug/18 13:21 Start Date: 24/Aug/18 13:21 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6208: [BEAM-2930] Side input support for Flink portable streaming. URL: https://github.com/apache/beam/pull/6208#discussion_r212574656 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java ## @@ -148,30 +147,45 @@ public void addSideInputValue(PCollectionView sideInput, WindowedValue T get(PCollectionView view, BoundedWindow window) { -@SuppressWarnings("unchecked") -Coder windowCoder = -(Coder) view.getWindowingStrategyInternal().getWindowFn().windowCoder(); - -StateTag>> stateTag = sideInputContentsTags.get(view); - -ValueState> state = -stateInternals.state(StateNamespaces.window(windowCoder, window), stateTag); -// TODO: Add support for choosing which representation is contained based upon the -// side input materialization. We currently can assume that we always have a multimap -// materialization as that is the only supported type within the Java SDK. -@Nullable Iterable> elements = (Iterable>) state.read(); +@Nullable Iterable elements = getIterable(view, window); if (elements == null) { elements = Collections.emptyList(); } Review comment: Shouldn't this go into the `getIterable` function? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 137804) Time Spent: 5h 10m (was: 5h) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 5h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137803=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137803 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 24/Aug/18 13:21 Start Date: 24/Aug/18 13:21 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6208: [BEAM-2930] Side input support for Flink portable streaming. URL: https://github.com/apache/beam/pull/6208#discussion_r212575795 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java ## @@ -148,30 +147,45 @@ public void addSideInputValue(PCollectionView sideInput, WindowedValue T get(PCollectionView view, BoundedWindow window) { -@SuppressWarnings("unchecked") -Coder windowCoder = -(Coder) view.getWindowingStrategyInternal().getWindowFn().windowCoder(); - -StateTag>> stateTag = sideInputContentsTags.get(view); - -ValueState> state = -stateInternals.state(StateNamespaces.window(windowCoder, window), stateTag); -// TODO: Add support for choosing which representation is contained based upon the -// side input materialization. We currently can assume that we always have a multimap -// materialization as that is the only supported type within the Java SDK. -@Nullable Iterable> elements = (Iterable>) state.read(); +@Nullable Iterable elements = getIterable(view, window); if (elements == null) { elements = Collections.emptyList(); } +// TODO: Add support for choosing which representation is contained based upon the +// side input materialization. We currently can assume that we always have a multimap +// materialization as that is the only supported type within the Java SDK. ViewFn viewFn = (ViewFn) view.getViewFn(); Review comment: Could we add an `instanceof` check and check whether this is `MultimapViewFn`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 137803) Time Spent: 5h (was: 4h 50m) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137806=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137806 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 24/Aug/18 13:21 Start Date: 24/Aug/18 13:21 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6208: [BEAM-2930] Side input support for Flink portable streaming. URL: https://github.com/apache/beam/pull/6208#discussion_r212337222 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStreamingSideInputHandlerFactory.java ## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.functions; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.collect.ImmutableMap; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Map; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.core.construction.graph.SideInputReference; +import org.apache.beam.runners.fnexecution.state.StateRequestHandler; +import org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputHandler; +import org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputHandlerFactory; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.Materializations; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * {@link StateRequestHandler} that uses {@link org.apache.beam.runners.core.SideInputHandler} to + * access the Flink broadcast state that represents side inputs. + */ +public class FlinkStreamingSideInputHandlerFactory implements SideInputHandlerFactory { + + // Map from side input id to global PCollection id. + private final Map> sideInputToCollection; + private final org.apache.beam.runners.core.SideInputHandler runnerHandler; + + /** + * Creates a new state handler for the given stage. Note that this requires a traversal of the + * stage itself, so this should only be called once per stage rather than once per bundle. + */ + public static FlinkStreamingSideInputHandlerFactory forStage( + ExecutableStage stage, + Map> viewMapping, + org.apache.beam.runners.core.SideInputHandler runnerHandler) { +ImmutableMap.Builder> sideInputBuilder = ImmutableMap.builder(); +for (SideInputReference sideInput : stage.getSideInputs()) { + SideInputId sideInputId = + SideInputId.newBuilder() + .setTransformId(sideInput.transform().getId()) + .setLocalName(sideInput.localName()) + .build(); + sideInputBuilder.put( + sideInputId, + checkNotNull( + viewMapping.get(sideInputId), + "No side input for %s/%s", + sideInputId.getTransformId(), + sideInputId.getLocalName())); +} + +FlinkStreamingSideInputHandlerFactory factory = +new FlinkStreamingSideInputHandlerFactory(sideInputBuilder.build(), runnerHandler); +return factory; + } + + private FlinkStreamingSideInputHandlerFactory( + Map> sideInputToCollection, + org.apache.beam.runners.core.SideInputHandler runnerHandler) { +this.sideInputToCollection = sideInputToCollection; +this.runnerHandler = runnerHandler; + } + + @Override + public SideInputHandler forSideInput( + String transformId, + String sideInputId, + RunnerApi.FunctionSpec accessPattern, + Coder elementCoder, +
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137802=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137802 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 24/Aug/18 13:21 Start Date: 24/Aug/18 13:21 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6208: [BEAM-2930] Side input support for Flink portable streaming. URL: https://github.com/apache/beam/pull/6208#discussion_r212581542 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java ## @@ -520,6 +567,158 @@ private void translateImpulse( } } + private static LinkedHashMap> + getSideInputIdToPCollectionViewMap( + RunnerApi.ExecutableStagePayload stagePayload, RunnerApi.Components components) { + +RehydratedComponents rehydratedComponents = RehydratedComponents.forComponents(components); + +LinkedHashMap> sideInputs = +new LinkedHashMap<>(); +for (RunnerApi.ExecutableStagePayload.SideInputId sideInputId : +stagePayload.getSideInputsList()) { + + // TODO: local name is unique as long as only one transform with side input can be within a stage + String sideInputTag = sideInputId.getLocalName(); + // for PCollectionView compatibility, not used to transform materialization + ViewFn>, ?> viewFn = + (ViewFn) new PCollectionViews.MultimapViewFn>, Void>(); Review comment: This can be moved below to the constructor or even outside the loop. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 137802) Time Spent: 4h 50m (was: 4h 40m) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 4h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137801=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137801 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 24/Aug/18 13:21 Start Date: 24/Aug/18 13:21 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6208: [BEAM-2930] Side input support for Flink portable streaming. URL: https://github.com/apache/beam/pull/6208#discussion_r212317984 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java ## @@ -149,6 +149,7 @@ public static FlinkBatchPortablePipelineTranslator createTranslator() { translatorMap.put( PTransformTranslation.RESHUFFLE_URN, FlinkBatchPortablePipelineTranslator::translateReshuffle); +// TODO: this does not seem required translatorMap.put( PTransformTranslation.CREATE_VIEW_TRANSFORM_URN, FlinkBatchPortablePipelineTranslator::translateView); Review comment: Not sure if that always work, e.g. when the input stream of the view is not already added by another transform? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 137801) Time Spent: 4h 40m (was: 4.5h) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 4h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137807=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137807 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 24/Aug/18 13:21 Start Date: 24/Aug/18 13:21 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6208: [BEAM-2930] Side input support for Flink portable streaming. URL: https://github.com/apache/beam/pull/6208#discussion_r212589676 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStreamingExecutableStageContext.java ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.functions; + +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.fnexecution.control.DockerJobBundleFactory; +import org.apache.beam.runners.fnexecution.control.JobBundleFactory; +import org.apache.beam.runners.fnexecution.control.StageBundleFactory; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; + +/** Implementation of a {@link FlinkExecutableStageContext} for streaming. */ +public class FlinkStreamingExecutableStageContext Review comment: Does this class add any additional functionality to the `FlinkBatchExecutableStageContext`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 137807) Time Spent: 5.5h (was: 5h 20m) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 5.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137396=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137396 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 23/Aug/18 14:22 Start Date: 23/Aug/18 14:22 Worklog Time Spent: 10m Work Description: tweise commented on issue #6208: [BEAM-2930] Side input support for Flink portable streaming. URL: https://github.com/apache/beam/pull/6208#issuecomment-415432797 @mxm that would be great, thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 137396) Time Spent: 4.5h (was: 4h 20m) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 4.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137389=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137389 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 23/Aug/18 13:57 Start Date: 23/Aug/18 13:57 Worklog Time Spent: 10m Work Description: mxm commented on issue #6208: [BEAM-2930] Side input support for Flink portable streaming. URL: https://github.com/apache/beam/pull/6208#issuecomment-415423307 Thank you for your work @tweise, I will also have a look. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 137389) Time Spent: 4h 20m (was: 4h 10m) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 4h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137110=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137110 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 22/Aug/18 18:40 Start Date: 22/Aug/18 18:40 Worklog Time Spent: 10m Work Description: tweise commented on issue #6208: [BEAM-2930] Side input support for Flink portable streaming. URL: https://github.com/apache/beam/pull/6208#issuecomment-415136625 The PR is ready for review. I made the changes to use GBK to materialize the side input, which adheres to the windowing strategy and is equivalent to what the old runner does. @lukecwik @angoenka This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 137110) Time Spent: 4h 10m (was: 4h) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Assignee: Thomas Weise >Priority: Major > Labels: portability > Time Spent: 4h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=135917=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-135917 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 17/Aug/18 23:21 Start Date: 17/Aug/18 23:21 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #6208: [WIP] [BEAM-2930] Side input support for Flink portable streaming. URL: https://github.com/apache/beam/pull/6208#issuecomment-414010931 I believe having a different result is ok since it is a possible answer that could have happened had the order of the original events been different and the checkpoint revert didn't happen. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 135917) Time Spent: 4h (was: 3h 50m) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Priority: Major > Labels: portability > Time Spent: 4h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=135910=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-135910 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 17/Aug/18 23:02 Start Date: 17/Aug/18 23:02 Worklog Time Spent: 10m Work Description: tweise commented on issue #6208: [WIP] [BEAM-2930] Side input support for Flink portable streaming. URL: https://github.com/apache/beam/pull/6208#issuecomment-414008492 OK, then the idea of building up state as Iterable as shown in this PR is definitely not working. Repeated firing isn't idempotent at least with Flink. If pipeline reverts to a checkpoint then change in order of arrival between (updating) side input and main input can lead to different result even for the same bundle. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 135910) Time Spent: 3h 50m (was: 3h 40m) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Priority: Major > Labels: portability > Time Spent: 3h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=135903=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-135903 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 17/Aug/18 22:53 Start Date: 17/Aug/18 22:53 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #6208: [WIP] [BEAM-2930] Side input support for Flink portable streaming. URL: https://github.com/apache/beam/pull/6208#issuecomment-414007244 Early/repeated firings provide inconsistent side input values between bundles. Side input access is meant to provide the latest firing at the time when the bundle starts processing and it is up to the runner to try to make the latency between side input firing being materialized and visible to the consumer to be minimal but there is no latency requirement other then best effort. Only side inputs that have a single firing can be reasoned concretely. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 135903) Time Spent: 3h 40m (was: 3.5h) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Priority: Major > Labels: portability > Time Spent: 3h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=135737=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-135737 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 17/Aug/18 16:18 Start Date: 17/Aug/18 16:18 Worklog Time Spent: 10m Work Description: tweise commented on issue #6208: [WIP] [BEAM-2930] Side input support for Flink portable streaming. URL: https://github.com/apache/beam/pull/6208#issuecomment-413916534 What is supposed to happen when the windowing strategy allows early/repeated firing? Won't that lead to indeterministic behavior? If repeated firing is possible, then my current approach of just accumulating records into an Iterable won't work. With the non portable runner, views are materialized via combine. While that would ensure that the Iterable is constructed correctly, repeated firing would still lead to indeterministic results. Anyone with better understanding of the old Flink runner can offer any insight here? https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java#L52 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 135737) Time Spent: 3.5h (was: 3h 20m) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Priority: Major > Labels: portability > Time Spent: 3.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=135101=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-135101 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 15/Aug/18 18:06 Start Date: 15/Aug/18 18:06 Worklog Time Spent: 10m Work Description: tweise commented on issue #6208: [WIP] [BEAM-2930] Side input support for Flink portable streaming. URL: https://github.com/apache/beam/pull/6208#issuecomment-413284242 @lukecwik thanks and correct, pushback is already handled. What I was missing is the allowed lateness and then return empty side input iterable even when no element was previously received. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 135101) Time Spent: 3h 20m (was: 3h 10m) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Priority: Major > Labels: portability > Time Spent: 3h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=135091=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-135091 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 15/Aug/18 17:49 Start Date: 15/Aug/18 17:49 Worklog Time Spent: 10m Work Description: lukecwik edited a comment on issue #6208: [WIP] [BEAM-2930] Side input support for Flink portable streaming. URL: https://github.com/apache/beam/pull/6208#issuecomment-413277807 @tweise You need to use the WindowMappingFn to figure out what side input is being asked for. If the side input isn't ready you should store the elements into state and set a timer. The simplest pushback solution would be to set a processing time timer that would then read those elements and check if they are ready, if so process them otherwise keep them in state. You could do better with map state or timers with payloads and you could also do better if you could use a watermark based timer based upon the side input PCollection. The windowing strategy on the side input PCollection tells you the [allowed lateness](https://github.com/apache/beam/blob/e2583f5e73de50f8af128ecaa331a2e1046d2b08/model/pipeline/src/main/proto/beam_runner_api.proto#L688). Once the watermark + allowed lateness of the side input PCollection surpasses it, the window becomes ready. This is how the existing implementation worked using [PushbackSideInputDoFnRunner](https://github.com/apache/beam/blob/b87aa6037b37aab95702fd1ace40b835f0f66f55/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java) and the [ReadyCheckingSideInputReader](https://github.com/apache/beam/blob/b87aa6037b37aab95702fd1ace40b835f0f66f55/runners/core-java/src/main/java/org/apache/beam/runners/core/ReadyCheckingSideInputReader.java). There seems to be some logic within the Flink [DoFnOperator](https://github.com/apache/beam/blob/b87aa6037b37aab95702fd1ace40b835f0f66f55/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L465) which handles pushback. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 135091) Time Spent: 3h 10m (was: 3h) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Priority: Major > Labels: portability > Time Spent: 3h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=135089=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-135089 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 15/Aug/18 17:46 Start Date: 15/Aug/18 17:46 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #6208: [WIP] [BEAM-2930] Side input support for Flink portable streaming. URL: https://github.com/apache/beam/pull/6208#issuecomment-413277807 @tweise You need to use the WindowMappingFn to figure out what side input is being asked for. If the side input isn't ready you should store the elements into state and set a timer. The simplest pushback solution would be to set a processing time timer that would then read those elements and check if they are ready, if so process them otherwise keep them in state. You could do better with map state or timers with payloads and you could also do better if you could use a watermark based timer based upon the side input PCollection. The windowing strategy on the side input PCollection tells you the [allowed lateness](https://github.com/apache/beam/blob/e2583f5e73de50f8af128ecaa331a2e1046d2b08/model/pipeline/src/main/proto/beam_runner_api.proto#L688). Once the watermark + allowed lateness of the side input PCollection surpasses it, the window becomes ready. This is how the existing implementation worked using [PushbackSideInputDoFnRunner](https://github.com/apache/beam/blob/b87aa6037b37aab95702fd1ace40b835f0f66f55/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java) and the [ReadyCheckingSideInputReader](https://github.com/apache/beam/blob/b87aa6037b37aab95702fd1ace40b835f0f66f55/runners/core-java/src/main/java/org/apache/beam/runners/core/ReadyCheckingSideInputReader.java). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 135089) Time Spent: 3h (was: 2h 50m) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Priority: Major > Labels: portability > Time Spent: 3h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=134783=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-134783 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 14/Aug/18 23:14 Start Date: 14/Aug/18 23:14 Worklog Time Spent: 10m Work Description: tweise commented on issue #6208: [WIP] [BEAM-2930] Side input support for Flink portable streaming. URL: https://github.com/apache/beam/pull/6208#issuecomment-413045243 @lukecwik thanks for taking a look. I wonder how droppably late can be evaluated in this case: The side input handler won't have received an input and the window is "ready" based on the watermark? In the old runner we have a separate combine operator, here I'm trying to accumulate the iterable in state. Regarding refactor, my goal is to get this functional correct in first pass. Hope to then discuss/work with @aljoscha on how to extract common base operator for both runners. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 134783) Time Spent: 2h 50m (was: 2h 40m) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Priority: Major > Labels: portability > Time Spent: 2h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=134708=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-134708 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 14/Aug/18 20:23 Start Date: 14/Aug/18 20:23 Worklog Time Spent: 10m Work Description: lukecwik edited a comment on issue #6208: [WIP] [BEAM-2930] Side input support for Flink portable streaming. URL: https://github.com/apache/beam/pull/6208#issuecomment-413002348 +1 on decoupling from pre-portability implementations because of the baggage they bring. Side inputs become ready for a window as soon as there has been at least one firing for them or if there have been zero firings and the input to the side input is now droppably late (meaning that the side input is empty). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 134708) Time Spent: 2h 40m (was: 2.5h) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Priority: Major > Labels: portability > Time Spent: 2h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=134698=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-134698 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 14/Aug/18 20:19 Start Date: 14/Aug/18 20:19 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #6208: [WIP] [BEAM-2930] Side input support for Flink portable streaming. URL: https://github.com/apache/beam/pull/6208#issuecomment-413002348 +1 on decoupling from pre-portability implementations because of the baggage they bring. Side inputs become ready for a window as soon as there has been at least one firing for them or if there have been zero firings and the input to the side input is now droppably late (meaning that the side input is empty or has some default value). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 134698) Time Spent: 2.5h (was: 2h 20m) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Priority: Major > Labels: portability > Time Spent: 2.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=133959=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-133959 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 11/Aug/18 20:29 Start Date: 11/Aug/18 20:29 Worklog Time Spent: 10m Work Description: tweise commented on issue #6208: [WIP] [BEAM-2930] Side input support for Flink portable streaming. URL: https://github.com/apache/beam/pull/6208#issuecomment-412299687 R: @bsidhom @angoenka This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 133959) Time Spent: 2h 20m (was: 2h 10m) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Priority: Major > Labels: portability > Time Spent: 2h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=133957=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-133957 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 11/Aug/18 19:32 Start Date: 11/Aug/18 19:32 Worklog Time Spent: 10m Work Description: tweise opened a new pull request #6208: [WIP] [BEAM-2930] Side input support for Flink portable streaming. URL: https://github.com/apache/beam/pull/6208 Support for streaming side inputs. Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). It will help us expedite review of your Pull Request if you tag someone (e.g. `@username`) to look at it. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | --- | --- | --- | --- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 133957) Time Spent: 2h 10m (was: 2h) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Priority: Major > Labels: portability > Time Spent: 2h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=129086=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-129086 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 31/Jul/18 02:02 Start Date: 31/Jul/18 02:02 Worklog Time Spent: 10m Work Description: tweise closed pull request #6082: [BEAM-2930] Side inputs are not yet supported in streaming mode. URL: https://github.com/apache/beam/pull/6082 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java index d376e58ec4b..0a48edb8d6b 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java @@ -433,17 +433,18 @@ private void translateImpulse( throw new RuntimeException(e); } -String inputPCollectionId = Iterables.getOnlyElement(transform.getInputsMap().values()); +String inputPCollectionId = stagePayload.getInput(); +// TODO: https://issues.apache.org/jira/browse/BEAM-2930 +if (stagePayload.getSideInputsCount() > 0) { + throw new UnsupportedOperationException( + "[BEAM-2930] streaming translator does not support side inputs: " + transform); +} Map, OutputTag>> tagsToOutputTags = Maps.newLinkedHashMap(); Map, Coder>> tagsToCoders = Maps.newLinkedHashMap(); // TODO: does it matter which output we designate as "main" -TupleTag mainOutputTag; -if (!outputs.isEmpty()) { - mainOutputTag = new TupleTag(outputs.keySet().iterator().next()); -} else { - mainOutputTag = null; -} +final TupleTag mainOutputTag = +outputs.isEmpty() ? null : new TupleTag(outputs.keySet().iterator().next()); // associate output tags with ids, output manager uses these Integer ids to serialize state BiMap outputIndexMap = This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 129086) Time Spent: 2h (was: 1h 50m) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Priority: Major > Labels: portability > Time Spent: 2h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=129082=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-129082 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 31/Jul/18 01:35 Start Date: 31/Jul/18 01:35 Worklog Time Spent: 10m Work Description: tweise commented on issue #6082: [BEAM-2930] Side inputs are not yet supported in streaming mode. URL: https://github.com/apache/beam/pull/6082#issuecomment-409065086 Run Java Precommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 129082) Time Spent: 1h 50m (was: 1h 40m) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Priority: Major > Labels: portability > Time Spent: 1h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=129058=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-129058 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 31/Jul/18 00:57 Start Date: 31/Jul/18 00:57 Worklog Time Spent: 10m Work Description: tweise commented on issue #6082: [BEAM-2930] Side inputs are not yet supported in streaming mode. URL: https://github.com/apache/beam/pull/6082#issuecomment-409059095 Run Java Precommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 129058) Time Spent: 1h 40m (was: 1.5h) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Priority: Major > Labels: portability > Time Spent: 1h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=129038=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-129038 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 31/Jul/18 00:31 Start Date: 31/Jul/18 00:31 Worklog Time Spent: 10m Work Description: tweise commented on issue #6082: [BEAM-2930] Side inputs are not yet supported in streaming mode. URL: https://github.com/apache/beam/pull/6082#issuecomment-409055117 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 129038) Time Spent: 1.5h (was: 1h 20m) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Priority: Major > Labels: portability > Time Spent: 1.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=129037=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-129037 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 31/Jul/18 00:29 Start Date: 31/Jul/18 00:29 Worklog Time Spent: 10m Work Description: tweise commented on issue #6082: [BEAM-2930] Side inputs are not yet supported in streaming mode. URL: https://github.com/apache/beam/pull/6082#issuecomment-409054911 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 129037) Time Spent: 1h 20m (was: 1h 10m) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Priority: Major > Labels: portability > Time Spent: 1h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=128952=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-128952 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 30/Jul/18 21:24 Start Date: 30/Jul/18 21:24 Worklog Time Spent: 10m Work Description: tweise commented on issue #6082: [BEAM-2930] Side inputs are not yet supported in streaming mode. URL: https://github.com/apache/beam/pull/6082#issuecomment-409016094 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 128952) Time Spent: 1h 10m (was: 1h) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Priority: Major > Labels: portability > Time Spent: 1h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=128945=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-128945 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 30/Jul/18 21:02 Start Date: 30/Jul/18 21:02 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #6082: [BEAM-2930] Side inputs are not yet supported in streaming mode. URL: https://github.com/apache/beam/pull/6082#discussion_r206320308 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java ## @@ -433,12 +433,17 @@ private void translateImpulse( throw new RuntimeException(e); } -String inputPCollectionId = Iterables.getOnlyElement(transform.getInputsMap().values()); +String inputPCollectionId = stagePayload.getInput(); +// TODO: BEAM-2930 +if (stagePayload.getSideInputsCount() > 0) { + throw new UnsupportedOperationException( + "[BEAM-2930] streaming translator does not support side inputs: " + transform); +} Map, OutputTag>> tagsToOutputTags = Maps.newLinkedHashMap(); Map, Coder>> tagsToCoders = Maps.newLinkedHashMap(); // TODO: does it matter which output we designate as "main" -TupleTag mainOutputTag; +final TupleTag mainOutputTag; Review comment: fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 128945) Time Spent: 50m (was: 40m) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Priority: Major > Labels: portability > Time Spent: 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=128946=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-128946 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 30/Jul/18 21:02 Start Date: 30/Jul/18 21:02 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #6082: [BEAM-2930] Side inputs are not yet supported in streaming mode. URL: https://github.com/apache/beam/pull/6082#discussion_r206320374 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java ## @@ -433,12 +433,17 @@ private void translateImpulse( throw new RuntimeException(e); } -String inputPCollectionId = Iterables.getOnlyElement(transform.getInputsMap().values()); +String inputPCollectionId = stagePayload.getInput(); +// TODO: BEAM-2930 Review comment: fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 128946) Time Spent: 1h (was: 50m) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Priority: Major > Labels: portability > Time Spent: 1h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=128918=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-128918 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 30/Jul/18 20:03 Start Date: 30/Jul/18 20:03 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #6082: [BEAM-2930] Side inputs are not yet supported in streaming mode. URL: https://github.com/apache/beam/pull/6082#discussion_r206302099 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java ## @@ -433,12 +433,17 @@ private void translateImpulse( throw new RuntimeException(e); } -String inputPCollectionId = Iterables.getOnlyElement(transform.getInputsMap().values()); +String inputPCollectionId = stagePayload.getInput(); +// TODO: BEAM-2930 Review comment: This is easier to reference if you include a full link rather than just the issue number. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 128918) Time Spent: 40m (was: 0.5h) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Priority: Major > Labels: portability > Time Spent: 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=128908=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-128908 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 30/Jul/18 19:43 Start Date: 30/Jul/18 19:43 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #6082: [BEAM-2930] Side inputs are not yet supported in streaming mode. URL: https://github.com/apache/beam/pull/6082#discussion_r206296850 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java ## @@ -433,12 +433,17 @@ private void translateImpulse( throw new RuntimeException(e); } -String inputPCollectionId = Iterables.getOnlyElement(transform.getInputsMap().values()); +String inputPCollectionId = stagePayload.getInput(); +// TODO: BEAM-2930 +if (stagePayload.getSideInputsCount() > 0) { + throw new UnsupportedOperationException( + "[BEAM-2930] streaming translator does not support side inputs: " + transform); +} Map, OutputTag>> tagsToOutputTags = Maps.newLinkedHashMap(); Map, Coder>> tagsToCoders = Maps.newLinkedHashMap(); // TODO: does it matter which output we designate as "main" -TupleTag mainOutputTag; +final TupleTag mainOutputTag; Review comment: We can single line this assignment for better readability. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 128908) Time Spent: 0.5h (was: 20m) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Priority: Major > Labels: portability > Time Spent: 0.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=128025=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-128025 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 27/Jul/18 04:13 Start Date: 27/Jul/18 04:13 Worklog Time Spent: 10m Work Description: tweise commented on issue #6082: [BEAM-2930] Side inputs are not yet supported in streaming mode. URL: https://github.com/apache/beam/pull/6082#issuecomment-408306168 R: @angoenka @bsidhom This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 128025) Time Spent: 20m (was: 10m) > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Priority: Major > Labels: portability > Time Spent: 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2930) Flink support for portable side input
[ https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=127998=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127998 ] ASF GitHub Bot logged work on BEAM-2930: Author: ASF GitHub Bot Created on: 27/Jul/18 02:57 Start Date: 27/Jul/18 02:57 Worklog Time Spent: 10m Work Description: tweise opened a new pull request #6082: [BEAM-2930] Side inputs are not yet supported in streaming mode. URL: https://github.com/apache/beam/pull/6082 Provide a helpful error message to understand why wordcount with file output fails in streaming mode. Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). It will help us expedite review of your Pull Request if you tag someone (e.g. `@username`) to look at it. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | --- | --- | --- | --- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 127998) Time Spent: 10m Remaining Estimate: 0h > Flink support for portable side input > - > > Key: BEAM-2930 > URL: https://issues.apache.org/jira/browse/BEAM-2930 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Henning Rohde >Priority: Major > Labels: portability > Time Spent: 10m > Remaining Estimate: 0h > -- This message was sent