[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137935=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137935
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 24/Aug/18 18:58
Start Date: 24/Aug/18 18:58
Worklog Time Spent: 10m 
  Work Description: tweise closed pull request #6208: [BEAM-2930] Side 
input support for Flink portable streaming.
URL: https://github.com/apache/beam/pull/6208
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
index c488522aa64..c453329051d 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
@@ -34,22 +34,22 @@
 import org.apache.beam.sdk.values.WindowingStrategy;
 
 /** A {@link PCollectionView} created from the components of a {@link 
SideInput}. */
-class RunnerPCollectionView extends PValueBase implements 
PCollectionView {
+public class RunnerPCollectionView extends PValueBase implements 
PCollectionView {
   private final TupleTag>> tag;
   private final ViewFn>, T> viewFn;
   private final WindowMappingFn windowMappingFn;
   private final @Nullable WindowingStrategy windowingStrategy;
-  private final @Nullable Coder>> coder;
-  private final transient PCollection pCollection;
+  private final @Nullable Coder coder;
+  private final transient @Nullable PCollection pCollection;
 
   /** Create a new {@link RunnerPCollectionView} from the provided components. 
*/
-  RunnerPCollectionView(
-  PCollection pCollection,
+  public RunnerPCollectionView(
+  @Nullable PCollection pCollection,
   TupleTag>> tag,
   ViewFn>, T> viewFn,
   WindowMappingFn windowMappingFn,
   @Nullable WindowingStrategy windowingStrategy,
-  @Nullable Coder>> coder) {
+  @Nullable Coder coder) {
 this.pCollection = pCollection;
 this.tag = tag;
 this.viewFn = viewFn;
@@ -84,7 +84,7 @@
   }
 
   @Override
-  public Coder>> getCoderInternal() {
+  public Coder getCoderInternal() {
 return coder;
   }
 
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
index e8c2b66ba42..1a6826d94b1 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
@@ -38,7 +38,6 @@
 import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 
 /**
@@ -148,30 +147,46 @@ public void addSideInputValue(PCollectionView 
sideInput, WindowedValue T get(PCollectionView view, BoundedWindow window) {
-@SuppressWarnings("unchecked")
-Coder windowCoder =
-(Coder) 
view.getWindowingStrategyInternal().getWindowFn().windowCoder();
-
-StateTag>> stateTag = 
sideInputContentsTags.get(view);
-
-ValueState> state =
-stateInternals.state(StateNamespaces.window(windowCoder, window), 
stateTag);
 
-// TODO: Add support for choosing which representation is contained based 
upon the
-// side input materialization. We currently can assume that we always have 
a multimap
-// materialization as that is the only supported type within the Java SDK.
-@Nullable Iterable> elements = (Iterable>) state.read();
+@Nullable Iterable elements = getIterable(view, window);
 
 if (elements == null) {
   elements = Collections.emptyList();
 }
 
+// TODO: Add support for choosing which representation is contained based 
upon the
+// side input materialization. We currently can assume that we always have 
a multimap
+// materialization as that is the only supported type within the Java SDK.
 ViewFn viewFn = (ViewFn) 
view.getViewFn();
 Coder keyCoder = ((KvCoder) 
view.getCoderInternal()).getKeyCoder();
 return (T)
 viewFn.apply(InMemoryMultimapSideInputView.fromIterable(keyCoder, 
(Iterable) elements));
   }
 
+  /**
+   * Retrieve the value as written by {@link 
#addSideInputValue(PCollectionView, WindowedValue)},
+   * without applying the SDK specific 

[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137922=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137922
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 24/Aug/18 18:24
Start Date: 24/Aug/18 18:24
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #6208: [BEAM-2930] Side input 
support for Flink portable streaming.
URL: https://github.com/apache/beam/pull/6208#issuecomment-415842617
 
 
   Run Java PreCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 137922)
Time Spent: 7h 10m  (was: 7h)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 7h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137868=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137868
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 24/Aug/18 15:58
Start Date: 24/Aug/18 15:58
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #6208: 
[BEAM-2930] Side input support for Flink portable streaming.
URL: https://github.com/apache/beam/pull/6208#discussion_r212675673
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 ##
 @@ -149,6 +149,7 @@ public static FlinkBatchPortablePipelineTranslator 
createTranslator() {
 translatorMap.put(
 PTransformTranslation.RESHUFFLE_URN,
 FlinkBatchPortablePipelineTranslator::translateReshuffle);
+// TODO: this does not seem required
 translatorMap.put(
 PTransformTranslation.CREATE_VIEW_TRANSFORM_URN,
 FlinkBatchPortablePipelineTranslator::translateView);
 
 Review comment:
   Dead code, removed it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 137868)
Time Spent: 7h  (was: 6h 50m)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 7h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137867=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137867
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 24/Aug/18 15:58
Start Date: 24/Aug/18 15:58
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #6208: 
[BEAM-2930] Side input support for Flink portable streaming.
URL: https://github.com/apache/beam/pull/6208#discussion_r212675593
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStreamingSideInputHandlerFactory.java
 ##
 @@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Map;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import 
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.SideInputReference;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import 
org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputHandler;
+import 
org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputHandlerFactory;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * {@link StateRequestHandler} that uses {@link 
org.apache.beam.runners.core.SideInputHandler} to
+ * access the Flink broadcast state that represents side inputs.
+ */
+public class FlinkStreamingSideInputHandlerFactory implements 
SideInputHandlerFactory {
+
+  // Map from side input id to global PCollection id.
+  private final Map> sideInputToCollection;
+  private final org.apache.beam.runners.core.SideInputHandler runnerHandler;
+
+  /**
+   * Creates a new state handler for the given stage. Note that this requires 
a traversal of the
+   * stage itself, so this should only be called once per stage rather than 
once per bundle.
+   */
+  public static FlinkStreamingSideInputHandlerFactory forStage(
+  ExecutableStage stage,
+  Map> viewMapping,
+  org.apache.beam.runners.core.SideInputHandler runnerHandler) {
+ImmutableMap.Builder> sideInputBuilder = 
ImmutableMap.builder();
+for (SideInputReference sideInput : stage.getSideInputs()) {
+  SideInputId sideInputId =
+  SideInputId.newBuilder()
+  .setTransformId(sideInput.transform().getId())
+  .setLocalName(sideInput.localName())
+  .build();
+  sideInputBuilder.put(
+  sideInputId,
+  checkNotNull(
+  viewMapping.get(sideInputId),
+  "No side input for %s/%s",
+  sideInputId.getTransformId(),
+  sideInputId.getLocalName()));
+}
+
+FlinkStreamingSideInputHandlerFactory factory =
+new FlinkStreamingSideInputHandlerFactory(sideInputBuilder.build(), 
runnerHandler);
+return factory;
+  }
+
+  private FlinkStreamingSideInputHandlerFactory(
+  Map> sideInputToCollection,
+  org.apache.beam.runners.core.SideInputHandler runnerHandler) {
+this.sideInputToCollection = sideInputToCollection;
+this.runnerHandler = runnerHandler;
+  }
+
+  @Override
+  public  SideInputHandler forSideInput(
+  String transformId,
+  String sideInputId,
+  RunnerApi.FunctionSpec accessPattern,
+  Coder elementCoder,
+  

[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137865=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137865
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 24/Aug/18 15:57
Start Date: 24/Aug/18 15:57
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #6208: 
[BEAM-2930] Side input support for Flink portable streaming.
URL: https://github.com/apache/beam/pull/6208#discussion_r212675392
 
 

 ##
 File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
 ##
 @@ -148,30 +147,45 @@ public void addSideInputValue(PCollectionView 
sideInput, WindowedValue T get(PCollectionView view, BoundedWindow window) {
-@SuppressWarnings("unchecked")
-Coder windowCoder =
-(Coder) 
view.getWindowingStrategyInternal().getWindowFn().windowCoder();
-
-StateTag>> stateTag = 
sideInputContentsTags.get(view);
-
-ValueState> state =
-stateInternals.state(StateNamespaces.window(windowCoder, window), 
stateTag);
 
-// TODO: Add support for choosing which representation is contained based 
upon the
-// side input materialization. We currently can assume that we always have 
a multimap
-// materialization as that is the only supported type within the Java SDK.
-@Nullable Iterable> elements = (Iterable>) state.read();
+@Nullable Iterable elements = getIterable(view, window);
 
 if (elements == null) {
   elements = Collections.emptyList();
 }
 
 Review comment:
   Not needed since side input isn't supposed to be accessed before the window 
is ready. Added extra check in the handler and comment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 137865)
Time Spent: 6.5h  (was: 6h 20m)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 6.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137863=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137863
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 24/Aug/18 15:57
Start Date: 24/Aug/18 15:57
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #6208: 
[BEAM-2930] Side input support for Flink portable streaming.
URL: https://github.com/apache/beam/pull/6208#discussion_r212675392
 
 

 ##
 File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
 ##
 @@ -148,30 +147,45 @@ public void addSideInputValue(PCollectionView 
sideInput, WindowedValue T get(PCollectionView view, BoundedWindow window) {
-@SuppressWarnings("unchecked")
-Coder windowCoder =
-(Coder) 
view.getWindowingStrategyInternal().getWindowFn().windowCoder();
-
-StateTag>> stateTag = 
sideInputContentsTags.get(view);
-
-ValueState> state =
-stateInternals.state(StateNamespaces.window(windowCoder, window), 
stateTag);
 
-// TODO: Add support for choosing which representation is contained based 
upon the
-// side input materialization. We currently can assume that we always have 
a multimap
-// materialization as that is the only supported type within the Java SDK.
-@Nullable Iterable> elements = (Iterable>) state.read();
+@Nullable Iterable elements = getIterable(view, window);
 
 if (elements == null) {
   elements = Collections.emptyList();
 }
 
 Review comment:
   Not needed since side input isn't supposed to be accessed before the window 
is ready. Added extra check.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 137863)
Time Spent: 6h 20m  (was: 6h 10m)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 6h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137866=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137866
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 24/Aug/18 15:57
Start Date: 24/Aug/18 15:57
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #6208: 
[BEAM-2930] Side input support for Flink portable streaming.
URL: https://github.com/apache/beam/pull/6208#discussion_r21267
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 ##
 @@ -108,27 +119,45 @@ public ExecutableStageDoFnOperator(
 this.jobInfo = jobInfo;
 this.contextFactory = contextFactory;
 this.outputMap = outputMap;
+this.sideInputIds = sideInputIds;
   }
 
   @Override
   public void open() throws Exception {
 super.open();
 
-ExecutableStage executableStage = ExecutableStage.fromPayload(payload);
+executableStage = ExecutableStage.fromPayload(payload);
 // TODO: Wire this into the distributed cache and make it pluggable.
 // TODO: Do we really want this layer of indirection when accessing the 
stage bundle factory?
 // It's a little strange because this operator is responsible for the 
lifetime of the stage
 // bundle "factory" (manager?) but not the job or Flink bundle factories. 
How do we make
 // ownership of the higher level "factories" explicit? Do we care?
 stageContext = contextFactory.get(jobInfo);
-// NOTE: It's safe to reuse the state handler between partitions because 
each partition uses the
-// same backing runtime context and broadcast variables. We use checkState 
below to catch errors
-// in backward-incompatible Flink changes.
-stateRequestHandler = stageContext.getStateRequestHandler(executableStage, 
getRuntimeContext());
+
+stateRequestHandler = getStateRequestHandler(executableStage);
 stageBundleFactory = stageContext.getStageBundleFactory(executableStage);
 progressHandler = BundleProgressHandler.unsupported();
   }
 
+  private StateRequestHandler getStateRequestHandler(ExecutableStage 
executableStage) {
+
+if (!executableStage.getSideInputs().isEmpty()) {
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 137866)
Time Spent: 6h 40m  (was: 6.5h)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 6h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137862=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137862
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 24/Aug/18 15:56
Start Date: 24/Aug/18 15:56
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #6208: 
[BEAM-2930] Side input support for Flink portable streaming.
URL: https://github.com/apache/beam/pull/6208#discussion_r212675152
 
 

 ##
 File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
 ##
 @@ -148,30 +147,45 @@ public void addSideInputValue(PCollectionView 
sideInput, WindowedValue T get(PCollectionView view, BoundedWindow window) {
-@SuppressWarnings("unchecked")
-Coder windowCoder =
-(Coder) 
view.getWindowingStrategyInternal().getWindowFn().windowCoder();
-
-StateTag>> stateTag = 
sideInputContentsTags.get(view);
-
-ValueState> state =
-stateInternals.state(StateNamespaces.window(windowCoder, window), 
stateTag);
 
-// TODO: Add support for choosing which representation is contained based 
upon the
-// side input materialization. We currently can assume that we always have 
a multimap
-// materialization as that is the only supported type within the Java SDK.
-@Nullable Iterable> elements = (Iterable>) state.read();
+@Nullable Iterable elements = getIterable(view, window);
 
 if (elements == null) {
   elements = Collections.emptyList();
 }
 
+// TODO: Add support for choosing which representation is contained based 
upon the
+// side input materialization. We currently can assume that we always have 
a multimap
+// materialization as that is the only supported type within the Java SDK.
 ViewFn viewFn = (ViewFn) 
view.getViewFn();
 
 Review comment:
   Left as is, this was just moved around. It is an SDK specific assumption, 
portable runner does not use this code path.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 137862)
Time Spent: 6h 10m  (was: 6h)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 6h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137861=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137861
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 24/Aug/18 15:55
Start Date: 24/Aug/18 15:55
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #6208: 
[BEAM-2930] Side input support for Flink portable streaming.
URL: https://github.com/apache/beam/pull/6208#discussion_r212674746
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 ##
 @@ -520,6 +567,158 @@ private void translateImpulse(
 }
   }
 
+  private static LinkedHashMap>
+  getSideInputIdToPCollectionViewMap(
+  RunnerApi.ExecutableStagePayload stagePayload, RunnerApi.Components 
components) {
+
+RehydratedComponents rehydratedComponents = 
RehydratedComponents.forComponents(components);
+
+LinkedHashMap> sideInputs =
+new LinkedHashMap<>();
+for (RunnerApi.ExecutableStagePayload.SideInputId sideInputId :
+stagePayload.getSideInputsList()) {
+
+  // TODO: local name is unique as long as only one transform with side 
input can be within a stage
+  String sideInputTag = sideInputId.getLocalName();
+  // for PCollectionView compatibility, not used to transform 
materialization
+  ViewFn>, ?> viewFn =
+  (ViewFn) new 
PCollectionViews.MultimapViewFn>, Void>();
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 137861)
Time Spent: 6h  (was: 5h 50m)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 6h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137860=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137860
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 24/Aug/18 15:55
Start Date: 24/Aug/18 15:55
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #6208: 
[BEAM-2930] Side input support for Flink portable streaming.
URL: https://github.com/apache/beam/pull/6208#discussion_r212674697
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 ##
 @@ -520,6 +567,158 @@ private void translateImpulse(
 }
   }
 
+  private static LinkedHashMap>
+  getSideInputIdToPCollectionViewMap(
+  RunnerApi.ExecutableStagePayload stagePayload, RunnerApi.Components 
components) {
+
+RehydratedComponents rehydratedComponents = 
RehydratedComponents.forComponents(components);
+
+LinkedHashMap> sideInputs =
+new LinkedHashMap<>();
+for (RunnerApi.ExecutableStagePayload.SideInputId sideInputId :
+stagePayload.getSideInputsList()) {
+
+  // TODO: local name is unique as long as only one transform with side 
input can be within a stage
+  String sideInputTag = sideInputId.getLocalName();
+  // for PCollectionView compatibility, not used to transform 
materialization
+  ViewFn>, ?> viewFn =
+  (ViewFn) new 
PCollectionViews.MultimapViewFn>, Void>();
+
+  String collectionId =
+  components
+  .getTransformsOrThrow(sideInputId.getTransformId())
+  .getInputsOrThrow(sideInputId.getLocalName());
+  RunnerApi.WindowingStrategy windowingStrategyProto =
+  components.getWindowingStrategiesOrThrow(
+  
components.getPcollectionsOrThrow(collectionId).getWindowingStrategyId());
+
+  final WindowingStrategy windowingStrategy;
+  try {
+windowingStrategy =
+WindowingStrategyTranslation.fromProto(windowingStrategyProto, 
rehydratedComponents);
+  } catch (InvalidProtocolBufferException e) {
+throw new IllegalStateException(
+String.format(
+"Unable to hydrate side input windowing strategy %s.", 
windowingStrategyProto),
+e);
+  }
+
+  Coder> coder = instantiateCoder(collectionId, 
components);
+  // side input materialization via GBK (T -> Iterable)
+  WindowedValueCoder wvCoder = (WindowedValueCoder) coder;
+  coder = 
wvCoder.withValueCoder(IterableCoder.of(wvCoder.getValueCoder()));
+
+  sideInputs.put(
+  sideInputId,
+  new RunnerPCollectionView<>(
+  null,
+  new TupleTag<>(sideInputTag),
+  viewFn,
+  // TODO: support custom mapping fn
+  windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
+  windowingStrategy,
+  coder));
+}
+return sideInputs;
+  }
+
+  private Tuple2>, DataStream> 
transformSideInputs(
+  RunnerApi.ExecutableStagePayload stagePayload,
+  RunnerApi.Components components,
+  StreamingTranslationContext context) {
+
+LinkedHashMap> sideInputs =
+getSideInputIdToPCollectionViewMap(stagePayload, components);
+
+Map, Integer> tagToIntMapping = new HashMap<>();
+Map> intToViewMapping = new HashMap<>();
+List>> kvCoders = new ArrayList<>();
+List> viewCoders = new ArrayList<>();
+
+int count = 0;
+for (Map.Entry> sideInput :
+sideInputs.entrySet()) {
+  TupleTag tag = sideInput.getValue().getTagInternal();
+  intToViewMapping.put(count, sideInput.getValue());
+  tagToIntMapping.put(tag, count);
+  count++;
+  String collectionId =
+  components
+  .getTransformsOrThrow(sideInput.getKey().getTransformId())
+  .getInputsOrThrow(sideInput.getKey().getLocalName());
+  DataStream sideInputStream = 
context.getDataStreamOrThrow(collectionId);
+  TypeInformation tpe = sideInputStream.getType();
+  if (!(tpe instanceof CoderTypeInformation)) {
+throw new IllegalStateException("Input Stream TypeInformation is no 
CoderTypeInformation.");
+  }
+
+  WindowedValueCoder coder =
+  (WindowedValueCoder) ((CoderTypeInformation) tpe).getCoder();
+  Coder> kvCoder = KvCoder.of(VoidCoder.of(), 
coder.getValueCoder());
+  kvCoders.add(coder.withValueCoder(kvCoder));
+  // coder for materialized view matching GBK below
+  WindowedValueCoder>> viewCoder =
+  coder.withValueCoder(KvCoder.of(VoidCoder.of(), 
IterableCoder.of(coder.getValueCoder(;
+  viewCoders.add(viewCoder);
+}
+
+// second pass, now that we gathered the input coders
+UnionCoder unionCoder = UnionCoder.of(viewCoders);
+
+CoderTypeInformation 

[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137859=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137859
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 24/Aug/18 15:54
Start Date: 24/Aug/18 15:54
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #6208: 
[BEAM-2930] Side input support for Flink portable streaming.
URL: https://github.com/apache/beam/pull/6208#discussion_r212674635
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStreamingExecutableStageContext.java
 ##
 @@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.fnexecution.control.DockerJobBundleFactory;
+import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
+import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+
+/** Implementation of a {@link FlinkExecutableStageContext} for streaming. */
+public class FlinkStreamingExecutableStageContext
 
 Review comment:
   None, removed it. (It was necessary before when it was entangled with the 
state request handler.)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 137859)
Time Spent: 5h 40m  (was: 5.5h)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 5h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137805=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137805
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 24/Aug/18 13:21
Start Date: 24/Aug/18 13:21
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6208: 
[BEAM-2930] Side input support for Flink portable streaming.
URL: https://github.com/apache/beam/pull/6208#discussion_r212340352
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 ##
 @@ -108,27 +119,45 @@ public ExecutableStageDoFnOperator(
 this.jobInfo = jobInfo;
 this.contextFactory = contextFactory;
 this.outputMap = outputMap;
+this.sideInputIds = sideInputIds;
   }
 
   @Override
   public void open() throws Exception {
 super.open();
 
-ExecutableStage executableStage = ExecutableStage.fromPayload(payload);
+executableStage = ExecutableStage.fromPayload(payload);
 // TODO: Wire this into the distributed cache and make it pluggable.
 // TODO: Do we really want this layer of indirection when accessing the 
stage bundle factory?
 // It's a little strange because this operator is responsible for the 
lifetime of the stage
 // bundle "factory" (manager?) but not the job or Flink bundle factories. 
How do we make
 // ownership of the higher level "factories" explicit? Do we care?
 stageContext = contextFactory.get(jobInfo);
-// NOTE: It's safe to reuse the state handler between partitions because 
each partition uses the
-// same backing runtime context and broadcast variables. We use checkState 
below to catch errors
-// in backward-incompatible Flink changes.
-stateRequestHandler = stageContext.getStateRequestHandler(executableStage, 
getRuntimeContext());
+
+stateRequestHandler = getStateRequestHandler(executableStage);
 stageBundleFactory = stageContext.getStageBundleFactory(executableStage);
 progressHandler = BundleProgressHandler.unsupported();
   }
 
+  private StateRequestHandler getStateRequestHandler(ExecutableStage 
executableStage) {
+
+if (!executableStage.getSideInputs().isEmpty()) {
 
 Review comment:
   Not important but I prefer: `if (executableStage.getSideInputs() > 0) {` or 
switch around the if/else blocks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 137805)
Time Spent: 5h 20m  (was: 5h 10m)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 5h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137808=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137808
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 24/Aug/18 13:21
Start Date: 24/Aug/18 13:21
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6208: 
[BEAM-2930] Side input support for Flink portable streaming.
URL: https://github.com/apache/beam/pull/6208#discussion_r212585550
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 ##
 @@ -520,6 +567,158 @@ private void translateImpulse(
 }
   }
 
+  private static LinkedHashMap>
+  getSideInputIdToPCollectionViewMap(
+  RunnerApi.ExecutableStagePayload stagePayload, RunnerApi.Components 
components) {
+
+RehydratedComponents rehydratedComponents = 
RehydratedComponents.forComponents(components);
+
+LinkedHashMap> sideInputs =
+new LinkedHashMap<>();
+for (RunnerApi.ExecutableStagePayload.SideInputId sideInputId :
+stagePayload.getSideInputsList()) {
+
+  // TODO: local name is unique as long as only one transform with side 
input can be within a stage
+  String sideInputTag = sideInputId.getLocalName();
+  // for PCollectionView compatibility, not used to transform 
materialization
+  ViewFn>, ?> viewFn =
+  (ViewFn) new 
PCollectionViews.MultimapViewFn>, Void>();
+
+  String collectionId =
+  components
+  .getTransformsOrThrow(sideInputId.getTransformId())
+  .getInputsOrThrow(sideInputId.getLocalName());
+  RunnerApi.WindowingStrategy windowingStrategyProto =
+  components.getWindowingStrategiesOrThrow(
+  
components.getPcollectionsOrThrow(collectionId).getWindowingStrategyId());
+
+  final WindowingStrategy windowingStrategy;
+  try {
+windowingStrategy =
+WindowingStrategyTranslation.fromProto(windowingStrategyProto, 
rehydratedComponents);
+  } catch (InvalidProtocolBufferException e) {
+throw new IllegalStateException(
+String.format(
+"Unable to hydrate side input windowing strategy %s.", 
windowingStrategyProto),
+e);
+  }
+
+  Coder> coder = instantiateCoder(collectionId, 
components);
+  // side input materialization via GBK (T -> Iterable)
+  WindowedValueCoder wvCoder = (WindowedValueCoder) coder;
+  coder = 
wvCoder.withValueCoder(IterableCoder.of(wvCoder.getValueCoder()));
+
+  sideInputs.put(
+  sideInputId,
+  new RunnerPCollectionView<>(
+  null,
+  new TupleTag<>(sideInputTag),
+  viewFn,
+  // TODO: support custom mapping fn
+  windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
+  windowingStrategy,
+  coder));
+}
+return sideInputs;
+  }
+
+  private Tuple2>, DataStream> 
transformSideInputs(
+  RunnerApi.ExecutableStagePayload stagePayload,
+  RunnerApi.Components components,
+  StreamingTranslationContext context) {
+
+LinkedHashMap> sideInputs =
+getSideInputIdToPCollectionViewMap(stagePayload, components);
+
+Map, Integer> tagToIntMapping = new HashMap<>();
+Map> intToViewMapping = new HashMap<>();
+List>> kvCoders = new ArrayList<>();
+List> viewCoders = new ArrayList<>();
+
+int count = 0;
+for (Map.Entry> sideInput :
+sideInputs.entrySet()) {
+  TupleTag tag = sideInput.getValue().getTagInternal();
+  intToViewMapping.put(count, sideInput.getValue());
+  tagToIntMapping.put(tag, count);
+  count++;
+  String collectionId =
+  components
+  .getTransformsOrThrow(sideInput.getKey().getTransformId())
+  .getInputsOrThrow(sideInput.getKey().getLocalName());
+  DataStream sideInputStream = 
context.getDataStreamOrThrow(collectionId);
+  TypeInformation tpe = sideInputStream.getType();
+  if (!(tpe instanceof CoderTypeInformation)) {
+throw new IllegalStateException("Input Stream TypeInformation is no 
CoderTypeInformation.");
+  }
+
+  WindowedValueCoder coder =
+  (WindowedValueCoder) ((CoderTypeInformation) tpe).getCoder();
+  Coder> kvCoder = KvCoder.of(VoidCoder.of(), 
coder.getValueCoder());
+  kvCoders.add(coder.withValueCoder(kvCoder));
+  // coder for materialized view matching GBK below
+  WindowedValueCoder>> viewCoder =
+  coder.withValueCoder(KvCoder.of(VoidCoder.of(), 
IterableCoder.of(coder.getValueCoder(;
+  viewCoders.add(viewCoder);
+}
+
+// second pass, now that we gathered the input coders
+UnionCoder unionCoder = UnionCoder.of(viewCoders);
+
+CoderTypeInformation 

[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137804=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137804
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 24/Aug/18 13:21
Start Date: 24/Aug/18 13:21
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6208: 
[BEAM-2930] Side input support for Flink portable streaming.
URL: https://github.com/apache/beam/pull/6208#discussion_r212574656
 
 

 ##
 File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
 ##
 @@ -148,30 +147,45 @@ public void addSideInputValue(PCollectionView 
sideInput, WindowedValue T get(PCollectionView view, BoundedWindow window) {
-@SuppressWarnings("unchecked")
-Coder windowCoder =
-(Coder) 
view.getWindowingStrategyInternal().getWindowFn().windowCoder();
-
-StateTag>> stateTag = 
sideInputContentsTags.get(view);
-
-ValueState> state =
-stateInternals.state(StateNamespaces.window(windowCoder, window), 
stateTag);
 
-// TODO: Add support for choosing which representation is contained based 
upon the
-// side input materialization. We currently can assume that we always have 
a multimap
-// materialization as that is the only supported type within the Java SDK.
-@Nullable Iterable> elements = (Iterable>) state.read();
+@Nullable Iterable elements = getIterable(view, window);
 
 if (elements == null) {
   elements = Collections.emptyList();
 }
 
 Review comment:
   Shouldn't this go into the `getIterable` function?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 137804)
Time Spent: 5h 10m  (was: 5h)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 5h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137803=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137803
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 24/Aug/18 13:21
Start Date: 24/Aug/18 13:21
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6208: 
[BEAM-2930] Side input support for Flink portable streaming.
URL: https://github.com/apache/beam/pull/6208#discussion_r212575795
 
 

 ##
 File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
 ##
 @@ -148,30 +147,45 @@ public void addSideInputValue(PCollectionView 
sideInput, WindowedValue T get(PCollectionView view, BoundedWindow window) {
-@SuppressWarnings("unchecked")
-Coder windowCoder =
-(Coder) 
view.getWindowingStrategyInternal().getWindowFn().windowCoder();
-
-StateTag>> stateTag = 
sideInputContentsTags.get(view);
-
-ValueState> state =
-stateInternals.state(StateNamespaces.window(windowCoder, window), 
stateTag);
 
-// TODO: Add support for choosing which representation is contained based 
upon the
-// side input materialization. We currently can assume that we always have 
a multimap
-// materialization as that is the only supported type within the Java SDK.
-@Nullable Iterable> elements = (Iterable>) state.read();
+@Nullable Iterable elements = getIterable(view, window);
 
 if (elements == null) {
   elements = Collections.emptyList();
 }
 
+// TODO: Add support for choosing which representation is contained based 
upon the
+// side input materialization. We currently can assume that we always have 
a multimap
+// materialization as that is the only supported type within the Java SDK.
 ViewFn viewFn = (ViewFn) 
view.getViewFn();
 
 Review comment:
   Could we add an `instanceof` check and check whether this is 
`MultimapViewFn`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 137803)
Time Spent: 5h  (was: 4h 50m)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137806=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137806
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 24/Aug/18 13:21
Start Date: 24/Aug/18 13:21
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6208: 
[BEAM-2930] Side input support for Flink portable streaming.
URL: https://github.com/apache/beam/pull/6208#discussion_r212337222
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStreamingSideInputHandlerFactory.java
 ##
 @@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Map;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import 
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.SideInputReference;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import 
org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputHandler;
+import 
org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputHandlerFactory;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * {@link StateRequestHandler} that uses {@link 
org.apache.beam.runners.core.SideInputHandler} to
+ * access the Flink broadcast state that represents side inputs.
+ */
+public class FlinkStreamingSideInputHandlerFactory implements 
SideInputHandlerFactory {
+
+  // Map from side input id to global PCollection id.
+  private final Map> sideInputToCollection;
+  private final org.apache.beam.runners.core.SideInputHandler runnerHandler;
+
+  /**
+   * Creates a new state handler for the given stage. Note that this requires 
a traversal of the
+   * stage itself, so this should only be called once per stage rather than 
once per bundle.
+   */
+  public static FlinkStreamingSideInputHandlerFactory forStage(
+  ExecutableStage stage,
+  Map> viewMapping,
+  org.apache.beam.runners.core.SideInputHandler runnerHandler) {
+ImmutableMap.Builder> sideInputBuilder = 
ImmutableMap.builder();
+for (SideInputReference sideInput : stage.getSideInputs()) {
+  SideInputId sideInputId =
+  SideInputId.newBuilder()
+  .setTransformId(sideInput.transform().getId())
+  .setLocalName(sideInput.localName())
+  .build();
+  sideInputBuilder.put(
+  sideInputId,
+  checkNotNull(
+  viewMapping.get(sideInputId),
+  "No side input for %s/%s",
+  sideInputId.getTransformId(),
+  sideInputId.getLocalName()));
+}
+
+FlinkStreamingSideInputHandlerFactory factory =
+new FlinkStreamingSideInputHandlerFactory(sideInputBuilder.build(), 
runnerHandler);
+return factory;
+  }
+
+  private FlinkStreamingSideInputHandlerFactory(
+  Map> sideInputToCollection,
+  org.apache.beam.runners.core.SideInputHandler runnerHandler) {
+this.sideInputToCollection = sideInputToCollection;
+this.runnerHandler = runnerHandler;
+  }
+
+  @Override
+  public  SideInputHandler forSideInput(
+  String transformId,
+  String sideInputId,
+  RunnerApi.FunctionSpec accessPattern,
+  Coder elementCoder,
+ 

[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137802=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137802
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 24/Aug/18 13:21
Start Date: 24/Aug/18 13:21
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6208: 
[BEAM-2930] Side input support for Flink portable streaming.
URL: https://github.com/apache/beam/pull/6208#discussion_r212581542
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 ##
 @@ -520,6 +567,158 @@ private void translateImpulse(
 }
   }
 
+  private static LinkedHashMap>
+  getSideInputIdToPCollectionViewMap(
+  RunnerApi.ExecutableStagePayload stagePayload, RunnerApi.Components 
components) {
+
+RehydratedComponents rehydratedComponents = 
RehydratedComponents.forComponents(components);
+
+LinkedHashMap> sideInputs =
+new LinkedHashMap<>();
+for (RunnerApi.ExecutableStagePayload.SideInputId sideInputId :
+stagePayload.getSideInputsList()) {
+
+  // TODO: local name is unique as long as only one transform with side 
input can be within a stage
+  String sideInputTag = sideInputId.getLocalName();
+  // for PCollectionView compatibility, not used to transform 
materialization
+  ViewFn>, ?> viewFn =
+  (ViewFn) new 
PCollectionViews.MultimapViewFn>, Void>();
 
 Review comment:
   This can be moved below to the constructor or even outside the loop.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 137802)
Time Spent: 4h 50m  (was: 4h 40m)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137801=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137801
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 24/Aug/18 13:21
Start Date: 24/Aug/18 13:21
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6208: 
[BEAM-2930] Side input support for Flink portable streaming.
URL: https://github.com/apache/beam/pull/6208#discussion_r212317984
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 ##
 @@ -149,6 +149,7 @@ public static FlinkBatchPortablePipelineTranslator 
createTranslator() {
 translatorMap.put(
 PTransformTranslation.RESHUFFLE_URN,
 FlinkBatchPortablePipelineTranslator::translateReshuffle);
+// TODO: this does not seem required
 translatorMap.put(
 PTransformTranslation.CREATE_VIEW_TRANSFORM_URN,
 FlinkBatchPortablePipelineTranslator::translateView);
 
 Review comment:
   Not sure if that always work, e.g. when the input stream of the view is not 
already added by another transform?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 137801)
Time Spent: 4h 40m  (was: 4.5h)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137807=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137807
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 24/Aug/18 13:21
Start Date: 24/Aug/18 13:21
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6208: 
[BEAM-2930] Side input support for Flink portable streaming.
URL: https://github.com/apache/beam/pull/6208#discussion_r212589676
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStreamingExecutableStageContext.java
 ##
 @@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.fnexecution.control.DockerJobBundleFactory;
+import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
+import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+
+/** Implementation of a {@link FlinkExecutableStageContext} for streaming. */
+public class FlinkStreamingExecutableStageContext
 
 Review comment:
   Does this class add any additional functionality to the 
`FlinkBatchExecutableStageContext`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 137807)
Time Spent: 5.5h  (was: 5h 20m)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 5.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-08-23 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137396=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137396
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 23/Aug/18 14:22
Start Date: 23/Aug/18 14:22
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #6208: [BEAM-2930] Side input 
support for Flink portable streaming.
URL: https://github.com/apache/beam/pull/6208#issuecomment-415432797
 
 
   @mxm that would be great, thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 137396)
Time Spent: 4.5h  (was: 4h 20m)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-08-23 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137389=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137389
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 23/Aug/18 13:57
Start Date: 23/Aug/18 13:57
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #6208: [BEAM-2930] Side input 
support for Flink portable streaming.
URL: https://github.com/apache/beam/pull/6208#issuecomment-415423307
 
 
   Thank you for your work @tweise, I will also have a look.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 137389)
Time Spent: 4h 20m  (was: 4h 10m)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-08-22 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137110=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137110
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 22/Aug/18 18:40
Start Date: 22/Aug/18 18:40
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #6208: [BEAM-2930] Side input 
support for Flink portable streaming.
URL: https://github.com/apache/beam/pull/6208#issuecomment-415136625
 
 
   The PR is ready for review. I made the changes to use GBK to materialize the 
side input, which adheres to the windowing strategy and is equivalent to what 
the old runner does. @lukecwik @angoenka 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 137110)
Time Spent: 4h 10m  (was: 4h)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Assignee: Thomas Weise
>Priority: Major
>  Labels: portability
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-08-17 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=135917=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-135917
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 17/Aug/18 23:21
Start Date: 17/Aug/18 23:21
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on issue #6208: [WIP] [BEAM-2930] 
Side input support for Flink portable streaming.
URL: https://github.com/apache/beam/pull/6208#issuecomment-414010931
 
 
   I believe having a different result is ok since it is a possible answer that 
could have happened had the order of the original events been different and the 
checkpoint revert didn't happen.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 135917)
Time Spent: 4h  (was: 3h 50m)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 4h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-08-17 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=135910=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-135910
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 17/Aug/18 23:02
Start Date: 17/Aug/18 23:02
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #6208: [WIP] [BEAM-2930] Side 
input support for Flink portable streaming.
URL: https://github.com/apache/beam/pull/6208#issuecomment-414008492
 
 
   OK, then the idea of building up state as Iterable as shown in this PR is 
definitely not working.
   
   Repeated firing isn't idempotent at least with Flink. If pipeline reverts to 
a checkpoint then change in order of arrival between (updating) side input and 
main input can lead to different result even for the same bundle.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 135910)
Time Spent: 3h 50m  (was: 3h 40m)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-08-17 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=135903=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-135903
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 17/Aug/18 22:53
Start Date: 17/Aug/18 22:53
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on issue #6208: [WIP] [BEAM-2930] 
Side input support for Flink portable streaming.
URL: https://github.com/apache/beam/pull/6208#issuecomment-414007244
 
 
   Early/repeated firings provide inconsistent side input values between 
bundles. Side input access is meant to provide the latest firing at the time 
when the bundle starts processing and it is up to the runner to try to make the 
latency between side input firing being materialized and visible to the 
consumer to be minimal but there is no latency requirement other then best 
effort.
   
   Only side inputs that have a single firing can be reasoned concretely.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 135903)
Time Spent: 3h 40m  (was: 3.5h)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-08-17 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=135737=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-135737
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 17/Aug/18 16:18
Start Date: 17/Aug/18 16:18
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #6208: [WIP] [BEAM-2930] Side 
input support for Flink portable streaming.
URL: https://github.com/apache/beam/pull/6208#issuecomment-413916534
 
 
   What is supposed to happen when the windowing strategy allows early/repeated 
firing? Won't that lead to indeterministic behavior? 
   
   If repeated firing is possible, then my current approach of just 
accumulating records into an Iterable won't work. 
   
   With the non portable runner, views are materialized via combine. While that 
would ensure that the Iterable is constructed correctly, repeated firing would 
still lead to indeterministic results. Anyone with better understanding of the 
old Flink runner can offer any insight here?
   
   
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java#L52
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 135737)
Time Spent: 3.5h  (was: 3h 20m)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-08-15 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=135101=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-135101
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 15/Aug/18 18:06
Start Date: 15/Aug/18 18:06
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #6208: [WIP] [BEAM-2930] Side 
input support for Flink portable streaming.
URL: https://github.com/apache/beam/pull/6208#issuecomment-413284242
 
 
   @lukecwik thanks and correct, pushback is already handled. What I was 
missing is the allowed lateness and then return empty side input iterable even 
when no element was previously received.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 135101)
Time Spent: 3h 20m  (was: 3h 10m)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-08-15 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=135091=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-135091
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 15/Aug/18 17:49
Start Date: 15/Aug/18 17:49
Worklog Time Spent: 10m 
  Work Description: lukecwik edited a comment on issue #6208: [WIP] 
[BEAM-2930] Side input support for Flink portable streaming.
URL: https://github.com/apache/beam/pull/6208#issuecomment-413277807
 
 
   @tweise You need to use the WindowMappingFn to figure out what side input is 
being asked for. If the side input isn't ready you should store the elements 
into state and set a timer. The simplest pushback solution would be to set a 
processing time timer that would then read those elements and check if they are 
ready, if so process them otherwise keep them in state. You could do better 
with map state or timers with payloads and you could also do better if you 
could use a watermark based timer based upon the side input PCollection.
   
   The windowing strategy on the side input PCollection tells you the [allowed 
lateness](https://github.com/apache/beam/blob/e2583f5e73de50f8af128ecaa331a2e1046d2b08/model/pipeline/src/main/proto/beam_runner_api.proto#L688).
 Once the watermark + allowed lateness of the side input PCollection surpasses 
it, the window becomes ready.
   
   This is how the existing implementation worked using 
[PushbackSideInputDoFnRunner](https://github.com/apache/beam/blob/b87aa6037b37aab95702fd1ace40b835f0f66f55/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java)
 and the 
[ReadyCheckingSideInputReader](https://github.com/apache/beam/blob/b87aa6037b37aab95702fd1ace40b835f0f66f55/runners/core-java/src/main/java/org/apache/beam/runners/core/ReadyCheckingSideInputReader.java).
   
   There seems to be some logic within the Flink 
[DoFnOperator](https://github.com/apache/beam/blob/b87aa6037b37aab95702fd1ace40b835f0f66f55/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L465)
 which handles pushback.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 135091)
Time Spent: 3h 10m  (was: 3h)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-08-15 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=135089=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-135089
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 15/Aug/18 17:46
Start Date: 15/Aug/18 17:46
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on issue #6208: [WIP] [BEAM-2930] 
Side input support for Flink portable streaming.
URL: https://github.com/apache/beam/pull/6208#issuecomment-413277807
 
 
   @tweise You need to use the WindowMappingFn to figure out what side input is 
being asked for. If the side input isn't ready you should store the elements 
into state and set a timer. The simplest pushback solution would be to set a 
processing time timer that would then read those elements and check if they are 
ready, if so process them otherwise keep them in state. You could do better 
with map state or timers with payloads and you could also do better if you 
could use a watermark based timer based upon the side input PCollection.
   
   The windowing strategy on the side input PCollection tells you the [allowed 
lateness](https://github.com/apache/beam/blob/e2583f5e73de50f8af128ecaa331a2e1046d2b08/model/pipeline/src/main/proto/beam_runner_api.proto#L688).
 Once the watermark + allowed lateness of the side input PCollection surpasses 
it, the window becomes ready.
   
   This is how the existing implementation worked using 
[PushbackSideInputDoFnRunner](https://github.com/apache/beam/blob/b87aa6037b37aab95702fd1ace40b835f0f66f55/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java)
 and the 
[ReadyCheckingSideInputReader](https://github.com/apache/beam/blob/b87aa6037b37aab95702fd1ace40b835f0f66f55/runners/core-java/src/main/java/org/apache/beam/runners/core/ReadyCheckingSideInputReader.java).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 135089)
Time Spent: 3h  (was: 2h 50m)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 3h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-08-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=134783=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-134783
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 14/Aug/18 23:14
Start Date: 14/Aug/18 23:14
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #6208: [WIP] [BEAM-2930] Side 
input support for Flink portable streaming.
URL: https://github.com/apache/beam/pull/6208#issuecomment-413045243
 
 
   @lukecwik thanks for taking a look. I wonder how droppably late can be 
evaluated in this case: The side input handler won't have received an input and 
the window is "ready" based on the watermark? In the old runner we have a 
separate combine operator, here I'm trying to accumulate the iterable in state.
   
   Regarding refactor, my goal is to get this functional correct in first pass. 
Hope to then discuss/work with @aljoscha on how to extract common base operator 
for both runners.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 134783)
Time Spent: 2h 50m  (was: 2h 40m)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-08-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=134708=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-134708
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 14/Aug/18 20:23
Start Date: 14/Aug/18 20:23
Worklog Time Spent: 10m 
  Work Description: lukecwik edited a comment on issue #6208: [WIP] 
[BEAM-2930] Side input support for Flink portable streaming.
URL: https://github.com/apache/beam/pull/6208#issuecomment-413002348
 
 
   +1 on decoupling from pre-portability implementations because of the baggage 
they bring.
   
   Side inputs become ready for a window as soon as there has been at least one 
firing for them or if there have been zero firings and the input to the side 
input is now droppably late (meaning that the side input is empty).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 134708)
Time Spent: 2h 40m  (was: 2.5h)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-08-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=134698=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-134698
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 14/Aug/18 20:19
Start Date: 14/Aug/18 20:19
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on issue #6208: [WIP] [BEAM-2930] 
Side input support for Flink portable streaming.
URL: https://github.com/apache/beam/pull/6208#issuecomment-413002348
 
 
   +1 on decoupling from pre-portability implementations because of the baggage 
they bring.
   
   Side inputs become ready for a window as soon as there has been at least one 
firing for them or if there have been zero firings and the input to the side 
input is now droppably late (meaning that the side input is empty or has some 
default value).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 134698)
Time Spent: 2.5h  (was: 2h 20m)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-08-11 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=133959=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-133959
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 11/Aug/18 20:29
Start Date: 11/Aug/18 20:29
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #6208: [WIP] [BEAM-2930] Side 
input support for Flink portable streaming.
URL: https://github.com/apache/beam/pull/6208#issuecomment-412299687
 
 
   R: @bsidhom @angoenka 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 133959)
Time Spent: 2h 20m  (was: 2h 10m)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-08-11 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=133957=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-133957
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 11/Aug/18 19:32
Start Date: 11/Aug/18 19:32
Worklog Time Spent: 10m 
  Work Description: tweise opened a new pull request #6208: [WIP] 
[BEAM-2930] Side input support for Flink portable streaming.
URL: https://github.com/apache/beam/pull/6208
 
 
   Support for streaming side inputs.
   
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   It will help us expedite review of your Pull Request if you tag someone 
(e.g. `@username`) to look at it.
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
  [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | --- | --- | --- | ---
   
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 133957)
Time Spent: 2h 10m  (was: 2h)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-07-30 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=129086=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-129086
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 31/Jul/18 02:02
Start Date: 31/Jul/18 02:02
Worklog Time Spent: 10m 
  Work Description: tweise closed pull request #6082: [BEAM-2930] Side 
inputs are not yet supported in streaming mode.
URL: https://github.com/apache/beam/pull/6082
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index d376e58ec4b..0a48edb8d6b 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -433,17 +433,18 @@ private void translateImpulse(
   throw new RuntimeException(e);
 }
 
-String inputPCollectionId = 
Iterables.getOnlyElement(transform.getInputsMap().values());
+String inputPCollectionId = stagePayload.getInput();
+// TODO: https://issues.apache.org/jira/browse/BEAM-2930
+if (stagePayload.getSideInputsCount() > 0) {
+  throw new UnsupportedOperationException(
+  "[BEAM-2930] streaming translator does not support side inputs: " + 
transform);
+}
 
 Map, OutputTag>> tagsToOutputTags = 
Maps.newLinkedHashMap();
 Map, Coder>> tagsToCoders = 
Maps.newLinkedHashMap();
 // TODO: does it matter which output we designate as "main"
-TupleTag mainOutputTag;
-if (!outputs.isEmpty()) {
-  mainOutputTag = new TupleTag(outputs.keySet().iterator().next());
-} else {
-  mainOutputTag = null;
-}
+final TupleTag mainOutputTag =
+outputs.isEmpty() ? null : new 
TupleTag(outputs.keySet().iterator().next());
 
 // associate output tags with ids, output manager uses these Integer ids 
to serialize state
 BiMap outputIndexMap =


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 129086)
Time Spent: 2h  (was: 1h 50m)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 2h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-07-30 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=129082=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-129082
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 31/Jul/18 01:35
Start Date: 31/Jul/18 01:35
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #6082: [BEAM-2930] Side 
inputs are not yet supported in streaming mode.
URL: https://github.com/apache/beam/pull/6082#issuecomment-409065086
 
 
   Run Java Precommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 129082)
Time Spent: 1h 50m  (was: 1h 40m)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-07-30 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=129058=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-129058
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 31/Jul/18 00:57
Start Date: 31/Jul/18 00:57
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #6082: [BEAM-2930] Side 
inputs are not yet supported in streaming mode.
URL: https://github.com/apache/beam/pull/6082#issuecomment-409059095
 
 
   Run Java Precommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 129058)
Time Spent: 1h 40m  (was: 1.5h)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-07-30 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=129038=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-129038
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 31/Jul/18 00:31
Start Date: 31/Jul/18 00:31
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #6082: [BEAM-2930] Side 
inputs are not yet supported in streaming mode.
URL: https://github.com/apache/beam/pull/6082#issuecomment-409055117
 
 
   test this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 129038)
Time Spent: 1.5h  (was: 1h 20m)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-07-30 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=129037=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-129037
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 31/Jul/18 00:29
Start Date: 31/Jul/18 00:29
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #6082: [BEAM-2930] Side 
inputs are not yet supported in streaming mode.
URL: https://github.com/apache/beam/pull/6082#issuecomment-409054911
 
 
   Run Java PreCommit
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 129037)
Time Spent: 1h 20m  (was: 1h 10m)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-07-30 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=128952=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-128952
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 30/Jul/18 21:24
Start Date: 30/Jul/18 21:24
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #6082: [BEAM-2930] Side 
inputs are not yet supported in streaming mode.
URL: https://github.com/apache/beam/pull/6082#issuecomment-409016094
 
 
   Run Java PreCommit
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 128952)
Time Spent: 1h 10m  (was: 1h)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-07-30 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=128945=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-128945
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 30/Jul/18 21:02
Start Date: 30/Jul/18 21:02
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #6082: 
[BEAM-2930] Side inputs are not yet supported in streaming mode.
URL: https://github.com/apache/beam/pull/6082#discussion_r206320308
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 ##
 @@ -433,12 +433,17 @@ private void translateImpulse(
   throw new RuntimeException(e);
 }
 
-String inputPCollectionId = 
Iterables.getOnlyElement(transform.getInputsMap().values());
+String inputPCollectionId = stagePayload.getInput();
+// TODO: BEAM-2930
+if (stagePayload.getSideInputsCount() > 0) {
+  throw new UnsupportedOperationException(
+  "[BEAM-2930] streaming translator does not support side inputs: " + 
transform);
+}
 
 Map, OutputTag>> tagsToOutputTags = 
Maps.newLinkedHashMap();
 Map, Coder>> tagsToCoders = 
Maps.newLinkedHashMap();
 // TODO: does it matter which output we designate as "main"
-TupleTag mainOutputTag;
+final TupleTag mainOutputTag;
 
 Review comment:
   fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 128945)
Time Spent: 50m  (was: 40m)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-07-30 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=128946=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-128946
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 30/Jul/18 21:02
Start Date: 30/Jul/18 21:02
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #6082: 
[BEAM-2930] Side inputs are not yet supported in streaming mode.
URL: https://github.com/apache/beam/pull/6082#discussion_r206320374
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 ##
 @@ -433,12 +433,17 @@ private void translateImpulse(
   throw new RuntimeException(e);
 }
 
-String inputPCollectionId = 
Iterables.getOnlyElement(transform.getInputsMap().values());
+String inputPCollectionId = stagePayload.getInput();
+// TODO: BEAM-2930
 
 Review comment:
   fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 128946)
Time Spent: 1h  (was: 50m)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 1h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-07-30 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=128918=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-128918
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 30/Jul/18 20:03
Start Date: 30/Jul/18 20:03
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #6082: 
[BEAM-2930] Side inputs are not yet supported in streaming mode.
URL: https://github.com/apache/beam/pull/6082#discussion_r206302099
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 ##
 @@ -433,12 +433,17 @@ private void translateImpulse(
   throw new RuntimeException(e);
 }
 
-String inputPCollectionId = 
Iterables.getOnlyElement(transform.getInputsMap().values());
+String inputPCollectionId = stagePayload.getInput();
+// TODO: BEAM-2930
 
 Review comment:
   This is easier to reference if you include a full link rather than just the 
issue number.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 128918)
Time Spent: 40m  (was: 0.5h)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-07-30 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=128908=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-128908
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 30/Jul/18 19:43
Start Date: 30/Jul/18 19:43
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #6082: 
[BEAM-2930] Side inputs are not yet supported in streaming mode.
URL: https://github.com/apache/beam/pull/6082#discussion_r206296850
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 ##
 @@ -433,12 +433,17 @@ private void translateImpulse(
   throw new RuntimeException(e);
 }
 
-String inputPCollectionId = 
Iterables.getOnlyElement(transform.getInputsMap().values());
+String inputPCollectionId = stagePayload.getInput();
+// TODO: BEAM-2930
+if (stagePayload.getSideInputsCount() > 0) {
+  throw new UnsupportedOperationException(
+  "[BEAM-2930] streaming translator does not support side inputs: " + 
transform);
+}
 
 Map, OutputTag>> tagsToOutputTags = 
Maps.newLinkedHashMap();
 Map, Coder>> tagsToCoders = 
Maps.newLinkedHashMap();
 // TODO: does it matter which output we designate as "main"
-TupleTag mainOutputTag;
+final TupleTag mainOutputTag;
 
 Review comment:
   We can single line this assignment for better readability.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 128908)
Time Spent: 0.5h  (was: 20m)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-07-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=128025=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-128025
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 27/Jul/18 04:13
Start Date: 27/Jul/18 04:13
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #6082: [BEAM-2930] Side 
inputs are not yet supported in streaming mode.
URL: https://github.com/apache/beam/pull/6082#issuecomment-408306168
 
 
   R: @angoenka @bsidhom 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 128025)
Time Spent: 20m  (was: 10m)

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2930) Flink support for portable side input

2018-07-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=127998=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127998
 ]

ASF GitHub Bot logged work on BEAM-2930:


Author: ASF GitHub Bot
Created on: 27/Jul/18 02:57
Start Date: 27/Jul/18 02:57
Worklog Time Spent: 10m 
  Work Description: tweise opened a new pull request #6082: [BEAM-2930] 
Side inputs are not yet supported in streaming mode.
URL: https://github.com/apache/beam/pull/6082
 
 
   Provide a helpful error message to understand why wordcount with file output 
fails in streaming mode.
   
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   It will help us expedite review of your Pull Request if you tag someone 
(e.g. `@username`) to look at it.
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
  [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | --- | --- | --- | ---
   
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 127998)
Time Spent: 10m
Remaining Estimate: 0h

> Flink support for portable side input
> -
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Henning Rohde
>Priority: Major
>  Labels: portability
>  Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent