[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers

2018-08-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=133312=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-133312
 ]

ASF GitHub Bot logged work on BEAM-4658:


Author: ASF GitHub Bot
Created on: 09/Aug/18 21:44
Start Date: 09/Aug/18 21:44
Worklog Time Spent: 10m 
  Work Description: lukecwik closed pull request #6187: [BEAM-4658] Follow 
up on PR comments from #6050
URL: https://github.com/apache/beam/pull/6187
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
index 3215691363c..2ddd80afae8 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
@@ -81,6 +81,7 @@
   private static final CompatibilityChecker DEFAULT_COMPATIBILITY_CHECKER =
   GreedyPCollectionFusers::unknownTransformCompatibility;
 
+  /** Returns true if the PTransform node for the given input PCollection can 
be fused across. */
   public static boolean canFuse(
   PTransformNode transformNode,
   Environment environment,
@@ -92,6 +93,10 @@ public static boolean canFuse(
 .canFuse(transformNode, environment, candidate, stagePCollections, 
pipeline);
   }
 
+  /**
+   * Returns true if the two PTransforms are compatible such that they can be 
executed in the same
+   * environment.
+   */
   public static boolean isCompatible(
   PTransformNode left, PTransformNode right, QueryablePipeline pipeline) {
 CompatibilityChecker leftChecker =
@@ -184,7 +189,8 @@ private static boolean canFuseParDo(
 
   private static boolean parDoCompatibility(
   PTransformNode parDo, PTransformNode other, QueryablePipeline pipeline) {
-// Implicitly true if we are attempting to fuse against oneself. This is 
for timer PCollection which create a loop.
+// Implicitly true if we are attempting to fuse against oneself. This case 
comes up for
+// PCollections representing timers since they create a self-loop in the 
graph.
 return parDo.equals(other)
 // This is a convenience rather than a strict requirement. In general, 
a ParDo that consumes
 // side inputs can be fused with other transforms in the same 
environment which are not
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TimerReference.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TimerReference.java
index 36dd3284a6a..0c6bf3a36bc 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TimerReference.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TimerReference.java
@@ -21,8 +21,9 @@
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 
 /**
- * A reference to a timer. This includes the PTransform that references the 
timer as well as the
- * PCollection referenced. Both are necessary in order to fully resolve a 
timer.
+ * Contains references to components relevant for runners during execution for 
timers. The
+ * referenced PTransform specifies the timer specification while the 
PCollection specifies the
+ * encoding representation.
  */
 @AutoValue
 public abstract class TimerReference {
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStageTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStageTest.java
index 4920e17cba9..41b1e6c1d4a 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStageTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStageTest.java
@@ -49,9 +49,9 @@ public void ofFullComponentsOnlyHasStagePTransforms() throws 
Exception {
 PTransform.newBuilder()
 .putInputs("input", "input.out")
 .putInputs("side_input", "sideInput.in")
-.putInputs("timer", "timer.out")
+.putInputs("timer", "timer.pc")
 .putOutputs("output", "output.out")
-.putOutputs("timer", "timer.out")
+.putOutputs("timer", 

[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers

2018-08-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=133026=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-133026
 ]

ASF GitHub Bot logged work on BEAM-4658:


Author: ASF GitHub Bot
Created on: 09/Aug/18 16:10
Start Date: 09/Aug/18 16:10
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on issue #6187: [BEAM-4658] Follow 
up on PR comments from #6050
URL: https://github.com/apache/beam/pull/6187#issuecomment-411812017
 
 
   Run Python PreCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 133026)
Time Spent: 4.5h  (was: 4h 20m)

> Update pipeline representation in runner support libraries to handle timers
> ---
>
> Key: BEAM-4658
> URL: https://issues.apache.org/jira/browse/BEAM-4658
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-core
>Reporter: Luke Cwik
>Assignee: Luke Cwik
>Priority: Major
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> Expose relevant timer information on *ExecutableStage* to runners
> Update fusion logic to handle timers.
> Provide execution time interface to fire timers into *RemoteBundle*s and also 
> to receive new timers that are being set.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers

2018-08-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=133027=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-133027
 ]

ASF GitHub Bot logged work on BEAM-4658:


Author: ASF GitHub Bot
Created on: 09/Aug/18 16:10
Start Date: 09/Aug/18 16:10
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on issue #6187: [BEAM-4658] Follow 
up on PR comments from #6050
URL: https://github.com/apache/beam/pull/6187#issuecomment-411812096
 
 
   Run Java PreCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 133027)
Time Spent: 4h 40m  (was: 4.5h)

> Update pipeline representation in runner support libraries to handle timers
> ---
>
> Key: BEAM-4658
> URL: https://issues.apache.org/jira/browse/BEAM-4658
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-core
>Reporter: Luke Cwik
>Assignee: Luke Cwik
>Priority: Major
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> Expose relevant timer information on *ExecutableStage* to runners
> Update fusion logic to handle timers.
> Provide execution time interface to fire timers into *RemoteBundle*s and also 
> to receive new timers that are being set.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers

2018-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=132735=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-132735
 ]

ASF GitHub Bot logged work on BEAM-4658:


Author: ASF GitHub Bot
Created on: 08/Aug/18 22:44
Start Date: 08/Aug/18 22:44
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on issue #6187: [BEAM-4658] Follow 
up on PR comments from #6050
URL: https://github.com/apache/beam/pull/6187#issuecomment-411577167
 
 
   R: @youngoli @tweise 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 132735)
Time Spent: 4h 20m  (was: 4h 10m)

> Update pipeline representation in runner support libraries to handle timers
> ---
>
> Key: BEAM-4658
> URL: https://issues.apache.org/jira/browse/BEAM-4658
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-core
>Reporter: Luke Cwik
>Assignee: Luke Cwik
>Priority: Major
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> Expose relevant timer information on *ExecutableStage* to runners
> Update fusion logic to handle timers.
> Provide execution time interface to fire timers into *RemoteBundle*s and also 
> to receive new timers that are being set.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers

2018-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=132734=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-132734
 ]

ASF GitHub Bot logged work on BEAM-4658:


Author: ASF GitHub Bot
Created on: 08/Aug/18 22:43
Start Date: 08/Aug/18 22:43
Worklog Time Spent: 10m 
  Work Description: lukecwik opened a new pull request #6187: [BEAM-4658] 
Follow up on PR comments from #6050
URL: https://github.com/apache/beam/pull/6187
 
 
   https://github.com/apache/beam/pull/6050
   
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   It will help us expedite review of your Pull Request if you tag someone 
(e.g. `@username`) to look at it.
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
  [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | --- | --- | --- | ---
   
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 132734)
Time Spent: 4h 10m  (was: 4h)

> Update pipeline representation in runner support libraries to handle timers
> ---
>
> Key: BEAM-4658
> URL: https://issues.apache.org/jira/browse/BEAM-4658
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-core
>Reporter: Luke Cwik
>Assignee: Luke Cwik
>Priority: Major
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> Expose relevant timer information on *ExecutableStage* to 

[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers

2018-08-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=132727=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-132727
 ]

ASF GitHub Bot logged work on BEAM-4658:


Author: ASF GitHub Bot
Created on: 08/Aug/18 22:22
Start Date: 08/Aug/18 22:22
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on issue #6050: [BEAM-4658] Update 
pipeline representation in runner support libraries to handle timers.
URL: https://github.com/apache/beam/pull/6050#issuecomment-411572212
 
 
   Yes, the Java SDK could be used to test timers in Flink. This would make 
Flink the first runner to support timers using portability.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 132727)
Time Spent: 4h  (was: 3h 50m)

> Update pipeline representation in runner support libraries to handle timers
> ---
>
> Key: BEAM-4658
> URL: https://issues.apache.org/jira/browse/BEAM-4658
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-core
>Reporter: Luke Cwik
>Assignee: Luke Cwik
>Priority: Major
>  Time Spent: 4h
>  Remaining Estimate: 0h
>
> Expose relevant timer information on *ExecutableStage* to runners
> Update fusion logic to handle timers.
> Provide execution time interface to fire timers into *RemoteBundle*s and also 
> to receive new timers that are being set.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers

2018-07-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127911=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127911
 ]

ASF GitHub Bot logged work on BEAM-4658:


Author: ASF GitHub Bot
Created on: 26/Jul/18 20:29
Start Date: 26/Jul/18 20:29
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #6050: [BEAM-4658] Update 
pipeline representation in runner support libraries to handle timers.
URL: https://github.com/apache/beam/pull/6050#issuecomment-408225259
 
 
   LGTM. The timer support in the Python SDK isn't ready, but any 
implementation in the Flink runner could be tested using the Java SDK at this 
point?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 127911)
Time Spent: 3h 50m  (was: 3h 40m)

> Update pipeline representation in runner support libraries to handle timers
> ---
>
> Key: BEAM-4658
> URL: https://issues.apache.org/jira/browse/BEAM-4658
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-core
>Reporter: Luke Cwik
>Assignee: Luke Cwik
>Priority: Major
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> Expose relevant timer information on *ExecutableStage* to runners
> Update fusion logic to handle timers.
> Provide execution time interface to fire timers into *RemoteBundle*s and also 
> to receive new timers that are being set.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers

2018-07-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127909=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127909
 ]

ASF GitHub Bot logged work on BEAM-4658:


Author: ASF GitHub Bot
Created on: 26/Jul/18 20:07
Start Date: 26/Jul/18 20:07
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #6050: 
[BEAM-4658] Update pipeline representation in runner support libraries to 
handle timers.
URL: https://github.com/apache/beam/pull/6050#discussion_r205586171
 
 

 ##
 File path: 
runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStageTest.java
 ##
 @@ -58,11 +60,13 @@ public void ofFullComponentsOnlyHasStagePTransforms() 
throws Exception {
 
.setDoFn(RunnerApi.SdkFunctionSpec.newBuilder().setEnvironmentId("foo"))
 .putSideInputs("side_input", 
RunnerApi.SideInput.getDefaultInstance())
 .putStateSpecs("user_state", 
RunnerApi.StateSpec.getDefaultInstance())
+.putTimerSpecs("timer", 
RunnerApi.TimerSpec.getDefaultInstance())
 .build()
 .toByteString()))
 .build();
 PCollection input = 
PCollection.newBuilder().setUniqueName("input.out").build();
 PCollection sideInput = 
PCollection.newBuilder().setUniqueName("sideInput.in").build();
+PCollection timer = 
PCollection.newBuilder().setUniqueName("timer.out").build();
 
 Review comment:
   It is confusing to reference the single timer collection as "timer.out" when 
it is used for both input and output.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 127909)
Time Spent: 3h 40m  (was: 3.5h)

> Update pipeline representation in runner support libraries to handle timers
> ---
>
> Key: BEAM-4658
> URL: https://issues.apache.org/jira/browse/BEAM-4658
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-core
>Reporter: Luke Cwik
>Assignee: Luke Cwik
>Priority: Major
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> Expose relevant timer information on *ExecutableStage* to runners
> Update fusion logic to handle timers.
> Provide execution time interface to fire timers into *RemoteBundle*s and also 
> to receive new timers that are being set.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers

2018-07-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127906=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127906
 ]

ASF GitHub Bot logged work on BEAM-4658:


Author: ASF GitHub Bot
Created on: 26/Jul/18 19:58
Start Date: 26/Jul/18 19:58
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #6050: 
[BEAM-4658] Update pipeline representation in runner support libraries to 
handle timers.
URL: https://github.com/apache/beam/pull/6050#discussion_r205583228
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TimerReference.java
 ##
 @@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction.graph;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+
+/**
+ * A reference to a timer. This includes the PTransform that references the 
timer as well as the
 
 Review comment:
   For the PTransform it is more an "owns" relationship rather than 
"references"? Why is the PCollection required to fully resolve a timer?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 127906)
Time Spent: 3.5h  (was: 3h 20m)

> Update pipeline representation in runner support libraries to handle timers
> ---
>
> Key: BEAM-4658
> URL: https://issues.apache.org/jira/browse/BEAM-4658
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-core
>Reporter: Luke Cwik
>Assignee: Luke Cwik
>Priority: Major
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> Expose relevant timer information on *ExecutableStage* to runners
> Update fusion logic to handle timers.
> Provide execution time interface to fire timers into *RemoteBundle*s and also 
> to receive new timers that are being set.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers

2018-07-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127905=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127905
 ]

ASF GitHub Bot logged work on BEAM-4658:


Author: ASF GitHub Bot
Created on: 26/Jul/18 19:57
Start Date: 26/Jul/18 19:57
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #6050: 
[BEAM-4658] Update pipeline representation in runner support libraries to 
handle timers.
URL: https://github.com/apache/beam/pull/6050#discussion_r205583228
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TimerReference.java
 ##
 @@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction.graph;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+
+/**
+ * A reference to a timer. This includes the PTransform that references the 
timer as well as the
 
 Review comment:
   For the PTransform it is more an "owns" relationship rather than 
"references"?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 127905)
Time Spent: 3h 20m  (was: 3h 10m)

> Update pipeline representation in runner support libraries to handle timers
> ---
>
> Key: BEAM-4658
> URL: https://issues.apache.org/jira/browse/BEAM-4658
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-core
>Reporter: Luke Cwik
>Assignee: Luke Cwik
>Priority: Major
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> Expose relevant timer information on *ExecutableStage* to runners
> Update fusion logic to handle timers.
> Provide execution time interface to fire timers into *RemoteBundle*s and also 
> to receive new timers that are being set.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers

2018-07-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127903=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127903
 ]

ASF GitHub Bot logged work on BEAM-4658:


Author: ASF GitHub Bot
Created on: 26/Jul/18 19:48
Start Date: 26/Jul/18 19:48
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #6050: 
[BEAM-4658] Update pipeline representation in runner support libraries to 
handle timers.
URL: https://github.com/apache/beam/pull/6050#discussion_r205581055
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
 ##
 @@ -150,41 +154,51 @@ private static boolean canFuseParDo(
   // is never possible.
   return false;
 }
-if (!pipeline.getSideInputs(parDo).isEmpty()) {
-  // At execution time, a Runner is required to only provide inputs to a 
PTransform that, at
-  // the time the PTransform processes them, the associated window is 
ready in all side inputs
-  // that the PTransform consumes. For an arbitrary stage, it is 
significantly complex for the
-  // runner to determine this for each input. As a result, we break fusion 
to simplify this
-  // inspection. In general, a ParDo which consumes side inputs cannot be 
fused into an
-  // executable stage alongside any transforms which are upstream of any 
of its side inputs.
-  return false;
-} else {
-  try {
-ParDoPayload payload = 
ParDoPayload.parseFrom(parDo.getTransform().getSpec().getPayload());
-if (payload.getStateSpecsCount() > 0 || payload.getTimerSpecsCount() > 
0) {
-  // Inputs to a ParDo that uses State or Timers must be 
key-partitioned, and elements for
-  // a key must execute serially. To avoid checking if the rest of the 
stage is
-  // key-partitioned and preserves keys, these ParDos do not fuse into 
an existing stage.
-  return false;
-}
-  } catch (InvalidProtocolBufferException e) {
-throw new IllegalArgumentException(e);
+try {
+  ParDoPayload payload = 
ParDoPayload.parseFrom(parDo.getTransform().getSpec().getPayload());
+  if (Maps.filterKeys(
+  parDo.getTransform().getInputsMap(), s -> 
payload.getTimerSpecsMap().containsKey(s))
+  .values()
+  .contains(candidate.getId())) {
+// Allow fusion across timer PCollections because they are a self loop.
+return true;
+  } else if (payload.getStateSpecsCount() > 0 || 
payload.getTimerSpecsCount() > 0) {
+// Inputs to a ParDo that uses State or Timers must be 
key-partitioned, and elements for
+// a key must execute serially. To avoid checking if the rest of the 
stage is
+// key-partitioned and preserves keys, these ParDos do not fuse into 
an existing stage.
+return false;
+  } else if (!pipeline.getSideInputs(parDo).isEmpty()) {
+// At execution time, a Runner is required to only provide inputs to a 
PTransform that, at
+// the time the PTransform processes them, the associated window is 
ready in all side inputs
+// that the PTransform consumes. For an arbitrary stage, it is 
significantly complex for the
+// runner to determine this for each input. As a result, we break 
fusion to simplify this
+// inspection. In general, a ParDo which consumes side inputs cannot 
be fused into an
+// executable stage alongside any transforms which are upstream of any 
of its side inputs.
+return false;
   }
+} catch (InvalidProtocolBufferException e) {
+  throw new IllegalArgumentException(e);
 }
 return true;
   }
 
   private static boolean parDoCompatibility(
 
 Review comment:
   nit: the name of this method sounds odd, should it be canFuse or something?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 127903)
Time Spent: 3h 10m  (was: 3h)

> Update pipeline representation in runner support libraries to handle timers
> ---
>
> Key: BEAM-4658
> URL: https://issues.apache.org/jira/browse/BEAM-4658
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-core
>Reporter: Luke Cwik
>Assignee: Luke Cwik
>Priority: Major
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> Expose relevant timer information on *ExecutableStage* to 

[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers

2018-07-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127902=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127902
 ]

ASF GitHub Bot logged work on BEAM-4658:


Author: ASF GitHub Bot
Created on: 26/Jul/18 19:38
Start Date: 26/Jul/18 19:38
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #6050: 
[BEAM-4658] Update pipeline representation in runner support libraries to 
handle timers.
URL: https://github.com/apache/beam/pull/6050#discussion_r205578326
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
 ##
 @@ -150,41 +154,51 @@ private static boolean canFuseParDo(
   // is never possible.
   return false;
 }
-if (!pipeline.getSideInputs(parDo).isEmpty()) {
-  // At execution time, a Runner is required to only provide inputs to a 
PTransform that, at
-  // the time the PTransform processes them, the associated window is 
ready in all side inputs
-  // that the PTransform consumes. For an arbitrary stage, it is 
significantly complex for the
-  // runner to determine this for each input. As a result, we break fusion 
to simplify this
-  // inspection. In general, a ParDo which consumes side inputs cannot be 
fused into an
-  // executable stage alongside any transforms which are upstream of any 
of its side inputs.
-  return false;
-} else {
-  try {
-ParDoPayload payload = 
ParDoPayload.parseFrom(parDo.getTransform().getSpec().getPayload());
-if (payload.getStateSpecsCount() > 0 || payload.getTimerSpecsCount() > 
0) {
-  // Inputs to a ParDo that uses State or Timers must be 
key-partitioned, and elements for
-  // a key must execute serially. To avoid checking if the rest of the 
stage is
-  // key-partitioned and preserves keys, these ParDos do not fuse into 
an existing stage.
-  return false;
-}
-  } catch (InvalidProtocolBufferException e) {
-throw new IllegalArgumentException(e);
+try {
+  ParDoPayload payload = 
ParDoPayload.parseFrom(parDo.getTransform().getSpec().getPayload());
+  if (Maps.filterKeys(
+  parDo.getTransform().getInputsMap(), s -> 
payload.getTimerSpecsMap().containsKey(s))
+  .values()
+  .contains(candidate.getId())) {
+// Allow fusion across timer PCollections because they are a self loop.
+return true;
+  } else if (payload.getStateSpecsCount() > 0 || 
payload.getTimerSpecsCount() > 0) {
+// Inputs to a ParDo that uses State or Timers must be 
key-partitioned, and elements for
 
 Review comment:
   yup, timers are really a special kind of state


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 127902)
Time Spent: 3h  (was: 2h 50m)

> Update pipeline representation in runner support libraries to handle timers
> ---
>
> Key: BEAM-4658
> URL: https://issues.apache.org/jira/browse/BEAM-4658
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-core
>Reporter: Luke Cwik
>Assignee: Luke Cwik
>Priority: Major
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> Expose relevant timer information on *ExecutableStage* to runners
> Update fusion logic to handle timers.
> Provide execution time interface to fire timers into *RemoteBundle*s and also 
> to receive new timers that are being set.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers

2018-07-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127877=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127877
 ]

ASF GitHub Bot logged work on BEAM-4658:


Author: ASF GitHub Bot
Created on: 26/Jul/18 18:19
Start Date: 26/Jul/18 18:19
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #6050: 
[BEAM-4658] Update pipeline representation in runner support libraries to 
handle timers.
URL: https://github.com/apache/beam/pull/6050#discussion_r20852
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
 ##
 @@ -104,7 +118,7 @@
*   The {@link PTransform#getSubtransformsList()} is empty. This 
ensures that executable
*   stages are treated as primitive transforms.
*   The only {@link PCollection} in the {@link 
PTransform#getInputsMap()} is the result of
-   *   {@link #getInputPCollection()}.
+   *   {@link #getInputPCollection()} and {@link #getSideInputs()}.
 
 Review comment:
   I have another pass to perform a few fixes to list all inputs and to 
consider timers as materialized PCollections so I can address this comment 
then. The timers are usable during execution now though, and the rest is 
clean-up/simplification.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 127877)
Time Spent: 2h 50m  (was: 2h 40m)

> Update pipeline representation in runner support libraries to handle timers
> ---
>
> Key: BEAM-4658
> URL: https://issues.apache.org/jira/browse/BEAM-4658
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-core
>Reporter: Luke Cwik
>Assignee: Luke Cwik
>Priority: Major
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> Expose relevant timer information on *ExecutableStage* to runners
> Update fusion logic to handle timers.
> Provide execution time interface to fire timers into *RemoteBundle*s and also 
> to receive new timers that are being set.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers

2018-07-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127867=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127867
 ]

ASF GitHub Bot logged work on BEAM-4658:


Author: ASF GitHub Bot
Created on: 26/Jul/18 17:50
Start Date: 26/Jul/18 17:50
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #6050: 
[BEAM-4658] Update pipeline representation in runner support libraries to 
handle timers.
URL: https://github.com/apache/beam/pull/6050#discussion_r20554
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
 ##
 @@ -104,7 +118,7 @@
*   The {@link PTransform#getSubtransformsList()} is empty. This 
ensures that executable
*   stages are treated as primitive transforms.
*   The only {@link PCollection} in the {@link 
PTransform#getInputsMap()} is the result of
-   *   {@link #getInputPCollection()}.
+   *   {@link #getInputPCollection()} and {@link #getSideInputs()}.
 
 Review comment:
   Perhaps `The {@link PTransform#getInputsMap()} is the result of` would be 
clearer.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 127867)
Time Spent: 2.5h  (was: 2h 20m)

> Update pipeline representation in runner support libraries to handle timers
> ---
>
> Key: BEAM-4658
> URL: https://issues.apache.org/jira/browse/BEAM-4658
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-core
>Reporter: Luke Cwik
>Assignee: Luke Cwik
>Priority: Major
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> Expose relevant timer information on *ExecutableStage* to runners
> Update fusion logic to handle timers.
> Provide execution time interface to fire timers into *RemoteBundle*s and also 
> to receive new timers that are being set.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers

2018-07-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127866=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127866
 ]

ASF GitHub Bot logged work on BEAM-4658:


Author: ASF GitHub Bot
Created on: 26/Jul/18 17:31
Start Date: 26/Jul/18 17:31
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on issue #6050: [BEAM-4658] Update 
pipeline representation in runner support libraries to handle timers.
URL: https://github.com/apache/beam/pull/6050#issuecomment-408174187
 
 
   Run Java PreCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 127866)
Time Spent: 2h 20m  (was: 2h 10m)

> Update pipeline representation in runner support libraries to handle timers
> ---
>
> Key: BEAM-4658
> URL: https://issues.apache.org/jira/browse/BEAM-4658
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-core
>Reporter: Luke Cwik
>Assignee: Luke Cwik
>Priority: Major
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> Expose relevant timer information on *ExecutableStage* to runners
> Update fusion logic to handle timers.
> Provide execution time interface to fire timers into *RemoteBundle*s and also 
> to receive new timers that are being set.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers

2018-07-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127860=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127860
 ]

ASF GitHub Bot logged work on BEAM-4658:


Author: ASF GitHub Bot
Created on: 26/Jul/18 17:13
Start Date: 26/Jul/18 17:13
Worklog Time Spent: 10m 
  Work Description: youngoli commented on a change in pull request #6050: 
[BEAM-4658] Update pipeline representation in runner support libraries to 
handle timers.
URL: https://github.com/apache/beam/pull/6050#discussion_r205534735
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/Networks.java
 ##
 @@ -193,29 +201,55 @@ public final NodeT apply(NodeT input) {
 "Only networks without self loops are supported, given %s",
 network);
 
-// Linked hashset will prevent duplicates from appearing and will maintain 
insertion order.
-LinkedHashSet nodes = new LinkedHashSet<>(network.nodes().size());
-Queue processingOrder = new ArrayDeque<>();
-// Add all the roots
-for (NodeT node : network.nodes()) {
-  if (network.inDegree(node) == 0) {
-processingOrder.add(node);
-  }
-}
+// Uses the following algorithm:
 
 Review comment:
   Ah, the detail I missed was that it the algorithm would gather any newly 
created sinks/sources (I thought after removing Sink node B wouldn't be 
considered a sink). It makes sense now and I believe it does work for 
topological ordering.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 127860)
Time Spent: 2h 10m  (was: 2h)

> Update pipeline representation in runner support libraries to handle timers
> ---
>
> Key: BEAM-4658
> URL: https://issues.apache.org/jira/browse/BEAM-4658
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-core
>Reporter: Luke Cwik
>Assignee: Luke Cwik
>Priority: Major
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> Expose relevant timer information on *ExecutableStage* to runners
> Update fusion logic to handle timers.
> Provide execution time interface to fire timers into *RemoteBundle*s and also 
> to receive new timers that are being set.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers

2018-07-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127828=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127828
 ]

ASF GitHub Bot logged work on BEAM-4658:


Author: ASF GitHub Bot
Created on: 26/Jul/18 16:15
Start Date: 26/Jul/18 16:15
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #6050: 
[BEAM-4658] Update pipeline representation in runner support libraries to 
handle timers.
URL: https://github.com/apache/beam/pull/6050#discussion_r205517383
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/Networks.java
 ##
 @@ -193,29 +201,55 @@ public final NodeT apply(NodeT input) {
 "Only networks without self loops are supported, given %s",
 network);
 
-// Linked hashset will prevent duplicates from appearing and will maintain 
insertion order.
-LinkedHashSet nodes = new LinkedHashSet<>(network.nodes().size());
-Queue processingOrder = new ArrayDeque<>();
-// Add all the roots
-for (NodeT node : network.nodes()) {
-  if (network.inDegree(node) == 0) {
-processingOrder.add(node);
-  }
-}
+// Uses the following algorithm:
 
 Review comment:
   Only nodes that are part of cycles are ever removed as part of the "delta" 
calculation.
   
   The algorithm:
   1) Removes all sinks from the graph (including newly created sinks once the 
graph is updated) until there are none.
   2) Removes all sources from the graph (including newly created sources once 
the graph is updated) until there are none.
   3) Remote a single back edge which highest delta. This may break a cycle. 
   
   Concretely, in your example, the algorithm would:
   * add Sink to the beginning of S2, removing it from the graph.
   * add B to the beginning of S2, removing it from the graph.
   * add A to the beginning of S2, removing it from the graph.
   * add Source1 or Source2 (ambiguous so lets say Source1) to the beginning of 
S2, removing it from the graph.
   * add Source2 to the beginning of S2, removing it from the graph.
   
   
   I expanded the comment within the code about the algorithm to clarify some 
parts of it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 127828)
Time Spent: 2h  (was: 1h 50m)

> Update pipeline representation in runner support libraries to handle timers
> ---
>
> Key: BEAM-4658
> URL: https://issues.apache.org/jira/browse/BEAM-4658
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-core
>Reporter: Luke Cwik
>Assignee: Luke Cwik
>Priority: Major
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> Expose relevant timer information on *ExecutableStage* to runners
> Update fusion logic to handle timers.
> Provide execution time interface to fire timers into *RemoteBundle*s and also 
> to receive new timers that are being set.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers

2018-07-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127820=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127820
 ]

ASF GitHub Bot logged work on BEAM-4658:


Author: ASF GitHub Bot
Created on: 26/Jul/18 15:58
Start Date: 26/Jul/18 15:58
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #6050: 
[BEAM-4658] Update pipeline representation in runner support libraries to 
handle timers.
URL: https://github.com/apache/beam/pull/6050#discussion_r205512052
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
 ##
 @@ -337,11 +337,22 @@ public static TupleTagList 
getAdditionalOutputTags(AppliedPTransform ap
 ptransform.getSpec().getUrn());
 ParDoPayload payload = 
ParDoPayload.parseFrom(ptransform.getSpec().getPayload());
 return components.getPcollectionsOrThrow(
-ptransform.getInputsOrThrow(getMainInputId(ptransform, payload)));
+ptransform.getInputsOrThrow(getMainInputName(ptransform, payload)));
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 127820)
Time Spent: 1h 50m  (was: 1h 40m)

> Update pipeline representation in runner support libraries to handle timers
> ---
>
> Key: BEAM-4658
> URL: https://issues.apache.org/jira/browse/BEAM-4658
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-core
>Reporter: Luke Cwik
>Assignee: Luke Cwik
>Priority: Major
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Expose relevant timer information on *ExecutableStage* to runners
> Update fusion logic to handle timers.
> Provide execution time interface to fire timers into *RemoteBundle*s and also 
> to receive new timers that are being set.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers

2018-07-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127818=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127818
 ]

ASF GitHub Bot logged work on BEAM-4658:


Author: ASF GitHub Bot
Created on: 26/Jul/18 15:58
Start Date: 26/Jul/18 15:58
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #6050: 
[BEAM-4658] Update pipeline representation in runner support libraries to 
handle timers.
URL: https://github.com/apache/beam/pull/6050#discussion_r205512015
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
 ##
 @@ -150,41 +154,51 @@ private static boolean canFuseParDo(
   // is never possible.
   return false;
 }
-if (!pipeline.getSideInputs(parDo).isEmpty()) {
-  // At execution time, a Runner is required to only provide inputs to a 
PTransform that, at
-  // the time the PTransform processes them, the associated window is 
ready in all side inputs
-  // that the PTransform consumes. For an arbitrary stage, it is 
significantly complex for the
-  // runner to determine this for each input. As a result, we break fusion 
to simplify this
-  // inspection. In general, a ParDo which consumes side inputs cannot be 
fused into an
-  // executable stage alongside any transforms which are upstream of any 
of its side inputs.
-  return false;
-} else {
-  try {
-ParDoPayload payload = 
ParDoPayload.parseFrom(parDo.getTransform().getSpec().getPayload());
-if (payload.getStateSpecsCount() > 0 || payload.getTimerSpecsCount() > 
0) {
-  // Inputs to a ParDo that uses State or Timers must be 
key-partitioned, and elements for
-  // a key must execute serially. To avoid checking if the rest of the 
stage is
-  // key-partitioned and preserves keys, these ParDos do not fuse into 
an existing stage.
-  return false;
-}
-  } catch (InvalidProtocolBufferException e) {
-throw new IllegalArgumentException(e);
+try {
+  ParDoPayload payload = 
ParDoPayload.parseFrom(parDo.getTransform().getSpec().getPayload());
+  if (Maps.filterKeys(
+  parDo.getTransform().getInputsMap(), s -> 
payload.getTimerSpecsMap().containsKey(s))
+  .values()
+  .contains(candidate.getId())) {
+// Allow fusion across timer PCollections because they are a self loop.
+return true;
+  } else if (payload.getStateSpecsCount() > 0 || 
payload.getTimerSpecsCount() > 0) {
+// Inputs to a ParDo that uses State or Timers must be 
key-partitioned, and elements for
+// a key must execute serially. To avoid checking if the rest of the 
stage is
+// key-partitioned and preserves keys, these ParDos do not fuse into 
an existing stage.
+return false;
+  } else if (!pipeline.getSideInputs(parDo).isEmpty()) {
+// At execution time, a Runner is required to only provide inputs to a 
PTransform that, at
+// the time the PTransform processes them, the associated window is 
ready in all side inputs
+// that the PTransform consumes. For an arbitrary stage, it is 
significantly complex for the
+// runner to determine this for each input. As a result, we break 
fusion to simplify this
+// inspection. In general, a ParDo which consumes side inputs cannot 
be fused into an
+// executable stage alongside any transforms which are upstream of any 
of its side inputs.
+return false;
   }
+} catch (InvalidProtocolBufferException e) {
+  throw new IllegalArgumentException(e);
 }
 return true;
   }
 
   private static boolean parDoCompatibility(
   PTransformNode parDo, PTransformNode other, QueryablePipeline pipeline) {
-// This is a convenience rather than a strict requirement. In general, a 
ParDo that consumes
-// side inputs can be fused with other transforms in the same environment 
which are not
-// upstream of any of the side inputs.
-return pipeline.getSideInputs(parDo).isEmpty()
-// Since we lack the ability to mark upstream transforms as key 
preserving, we
-// purposefully break fusion here to provide runners the opportunity 
to insert a
-// grouping operation
-&& pipeline.getUserStates(parDo).isEmpty()
-&& compatibleEnvironments(parDo, other, pipeline);
+// Implicitly true if we are attempting to fuse against oneself. This is 
for timer PCollection which create a loop.
+return parDo.equals(other)
+// This is a convenience rather than a strict requirement. In general, 
a ParDo that consumes
+// side inputs can be fused with other transforms in the same 
environment which are not
+// upstream of any of the side inputs.
+ 

[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers

2018-07-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127819=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127819
 ]

ASF GitHub Bot logged work on BEAM-4658:


Author: ASF GitHub Bot
Created on: 26/Jul/18 15:58
Start Date: 26/Jul/18 15:58
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #6050: 
[BEAM-4658] Update pipeline representation in runner support libraries to 
handle timers.
URL: https://github.com/apache/beam/pull/6050#discussion_r205512043
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
 ##
 @@ -104,7 +118,7 @@
*   The {@link PTransform#getSubtransformsList()} is empty. This 
ensures that executable
*   stages are treated as primitive transforms.
*   The only {@link PCollection} in the {@link 
PTransform#getInputsMap()} is the result of
-   *   {@link #getInputPCollection()}.
+   *   {@link #getInputPCollection()} and {@link #getSideInputs()}.
 
 Review comment:
   It is multiple, fixed comment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 127819)
Time Spent: 1h 40m  (was: 1.5h)

> Update pipeline representation in runner support libraries to handle timers
> ---
>
> Key: BEAM-4658
> URL: https://issues.apache.org/jira/browse/BEAM-4658
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-core
>Reporter: Luke Cwik
>Assignee: Luke Cwik
>Priority: Major
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Expose relevant timer information on *ExecutableStage* to runners
> Update fusion logic to handle timers.
> Provide execution time interface to fire timers into *RemoteBundle*s and also 
> to receive new timers that are being set.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers

2018-07-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127769=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127769
 ]

ASF GitHub Bot logged work on BEAM-4658:


Author: ASF GitHub Bot
Created on: 26/Jul/18 14:45
Start Date: 26/Jul/18 14:45
Worklog Time Spent: 10m 
  Work Description: lukecwik closed pull request #6002: [BEAM-4658] Modify 
RemoteBundle interface to allow for multiple inputs.
URL: https://github.com/apache/beam/pull/6002
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory.java
index ca20949c953..9a9121b92a4 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.beam.runners.direct.portable;
 
+import com.google.common.collect.Iterables;
 import java.util.ArrayList;
 import java.util.Collection;
 import javax.annotation.Nullable;
@@ -29,6 +30,7 @@
 import org.apache.beam.runners.fnexecution.control.RemoteBundle;
 import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
 import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.util.WindowedValue;
 
 /**
@@ -59,7 +61,8 @@ public void cleanup() throws Exception {
 
   private class RemoteStageEvaluator implements TransformEvaluator {
 private final PTransformNode transform;
-private final RemoteBundle bundle;
+private final RemoteBundle bundle;
+private final FnDataReceiver> mainInput;
 private final Collection> outputs;
 
 private RemoteStageEvaluator(PTransformNode transform) throws Exception {
@@ -67,19 +70,20 @@ private RemoteStageEvaluator(PTransformNode transform) 
throws Exception {
   ExecutableStage stage =
   ExecutableStage.fromPayload(
   
ExecutableStagePayload.parseFrom(transform.getTransform().getSpec().getPayload()));
-  outputs = new ArrayList<>();
+  this.outputs = new ArrayList<>();
   StageBundleFactory stageFactory = jobFactory.forStage(stage);
-  bundle =
+  this.bundle =
   stageFactory.getBundle(
   BundleFactoryOutputReceiverFactory.create(
   bundleFactory, stage.getComponents(), outputs::add),
   StateRequestHandler.unsupported(),
   BundleProgressHandler.unsupported());
+  this.mainInput = 
Iterables.getOnlyElement(bundle.getInputReceivers().values());
 }
 
 @Override
 public void processElement(WindowedValue element) throws Exception {
-  bundle.getInputReceiver().accept(element);
+  mainInput.accept(element);
 }
 
 @Override
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/SplittableRemoteStageEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/SplittableRemoteStageEvaluatorFactory.java
index d1aaf174a0b..8119c40d3cb 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/SplittableRemoteStageEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/SplittableRemoteStageEvaluatorFactory.java
@@ -36,6 +36,7 @@
 import org.apache.beam.runners.fnexecution.wire.WireCoders;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
@@ -89,7 +90,8 @@ public void cleanup() throws Exception {
 
 private final CopyOnAccessInMemoryStateInternals stateInternals;
 private final DirectTimerInternals timerInternals;
-private final RemoteBundle> bundle;
+private final RemoteBundle bundle;
+private final FnDataReceiver> mainInput;
 private final Collection> outputs;
 
 private final SDFFeederViaStateAndTimers feeder;
@@ -144,6 +146,7 @@ public void onCompleted(ProcessBundleResponse response) {
   }
 }
   });
+  this.mainInput = 
Iterables.getOnlyElement(bundle.getInputReceivers().values());
 }
 
 @Override
@@ -158,7 +161,7 @@ public void processElement(
   } 

[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers

2018-07-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127658=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127658
 ]

ASF GitHub Bot logged work on BEAM-4658:


Author: ASF GitHub Bot
Created on: 26/Jul/18 06:09
Start Date: 26/Jul/18 06:09
Worklog Time Spent: 10m 
  Work Description: youngoli commented on a change in pull request #6050: 
[BEAM-4658] Update pipeline representation in runner support libraries to 
handle timers.
URL: https://github.com/apache/beam/pull/6050#discussion_r205256708
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
 ##
 @@ -337,11 +337,22 @@ public static TupleTagList 
getAdditionalOutputTags(AppliedPTransform ap
 ptransform.getSpec().getUrn());
 ParDoPayload payload = 
ParDoPayload.parseFrom(ptransform.getSpec().getPayload());
 return components.getPcollectionsOrThrow(
-ptransform.getInputsOrThrow(getMainInputId(ptransform, payload)));
+ptransform.getInputsOrThrow(getMainInputName(ptransform, payload)));
 
 Review comment:
   Since you added a version of getMainInputName that retrieves the payload, 
what about using that here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 127658)
Time Spent: 1h  (was: 50m)

> Update pipeline representation in runner support libraries to handle timers
> ---
>
> Key: BEAM-4658
> URL: https://issues.apache.org/jira/browse/BEAM-4658
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-core
>Reporter: Luke Cwik
>Assignee: Luke Cwik
>Priority: Major
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> Expose relevant timer information on *ExecutableStage* to runners
> Update fusion logic to handle timers.
> Provide execution time interface to fire timers into *RemoteBundle*s and also 
> to receive new timers that are being set.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers

2018-07-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127656=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127656
 ]

ASF GitHub Bot logged work on BEAM-4658:


Author: ASF GitHub Bot
Created on: 26/Jul/18 06:09
Start Date: 26/Jul/18 06:09
Worklog Time Spent: 10m 
  Work Description: youngoli commented on a change in pull request #6050: 
[BEAM-4658] Update pipeline representation in runner support libraries to 
handle timers.
URL: https://github.com/apache/beam/pull/6050#discussion_r205290440
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/Networks.java
 ##
 @@ -193,29 +201,55 @@ public final NodeT apply(NodeT input) {
 "Only networks without self loops are supported, given %s",
 network);
 
-// Linked hashset will prevent duplicates from appearing and will maintain 
insertion order.
-LinkedHashSet nodes = new LinkedHashSet<>(network.nodes().size());
-Queue processingOrder = new ArrayDeque<>();
-// Add all the roots
-for (NodeT node : network.nodes()) {
-  if (network.inDegree(node) == 0) {
-processingOrder.add(node);
-  }
-}
+// Uses the following algorithm:
 
 Review comment:
   Using this algorithm for topological order doesn't seem appropriate. From 
what I could understand from the paper, this algorithm generates a vertex 
sequence that makes it easy to remove feedback arcs (i.e. eliminate any 
cycles), but doesn't actually maintain topological order.
   
   As a concrete example, if you had a graph that looked like so:
   ```
   Source1 --\
  ---> A ---> B ---> Sink
   Source2 --/
   ```
   
   Topologically I would expect the result to be: `Source1, Source2, A, B, 
Sink`. But instead this algorithm would produce `Source1, Source2, B, A, Sink` 
because B is considered to have the higher delta than A (delta(B) = 0, delta(A) 
= -1).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 127656)
Time Spent: 50m  (was: 40m)

> Update pipeline representation in runner support libraries to handle timers
> ---
>
> Key: BEAM-4658
> URL: https://issues.apache.org/jira/browse/BEAM-4658
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-core
>Reporter: Luke Cwik
>Assignee: Luke Cwik
>Priority: Major
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> Expose relevant timer information on *ExecutableStage* to runners
> Update fusion logic to handle timers.
> Provide execution time interface to fire timers into *RemoteBundle*s and also 
> to receive new timers that are being set.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers

2018-07-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127657=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127657
 ]

ASF GitHub Bot logged work on BEAM-4658:


Author: ASF GitHub Bot
Created on: 26/Jul/18 06:09
Start Date: 26/Jul/18 06:09
Worklog Time Spent: 10m 
  Work Description: youngoli commented on a change in pull request #6050: 
[BEAM-4658] Update pipeline representation in runner support libraries to 
handle timers.
URL: https://github.com/apache/beam/pull/6050#discussion_r205264448
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
 ##
 @@ -104,7 +118,7 @@
*   The {@link PTransform#getSubtransformsList()} is empty. This 
ensures that executable
*   stages are treated as primitive transforms.
*   The only {@link PCollection} in the {@link 
PTransform#getInputsMap()} is the result of
-   *   {@link #getInputPCollection()}.
+   *   {@link #getInputPCollection()} and {@link #getSideInputs()}.
 
 Review comment:
   The way this is worded makes it sound like PTransform.getInputsMap has only 
one PCollection, which sounds odd if it's the result of both those functions. 
Is that correct, or does it actually contain multiple PCollections now?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 127657)

> Update pipeline representation in runner support libraries to handle timers
> ---
>
> Key: BEAM-4658
> URL: https://issues.apache.org/jira/browse/BEAM-4658
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-core
>Reporter: Luke Cwik
>Assignee: Luke Cwik
>Priority: Major
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> Expose relevant timer information on *ExecutableStage* to runners
> Update fusion logic to handle timers.
> Provide execution time interface to fire timers into *RemoteBundle*s and also 
> to receive new timers that are being set.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers

2018-07-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127659=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127659
 ]

ASF GitHub Bot logged work on BEAM-4658:


Author: ASF GitHub Bot
Created on: 26/Jul/18 06:09
Start Date: 26/Jul/18 06:09
Worklog Time Spent: 10m 
  Work Description: youngoli commented on a change in pull request #6050: 
[BEAM-4658] Update pipeline representation in runner support libraries to 
handle timers.
URL: https://github.com/apache/beam/pull/6050#discussion_r205274669
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
 ##
 @@ -150,41 +154,51 @@ private static boolean canFuseParDo(
   // is never possible.
   return false;
 }
-if (!pipeline.getSideInputs(parDo).isEmpty()) {
-  // At execution time, a Runner is required to only provide inputs to a 
PTransform that, at
-  // the time the PTransform processes them, the associated window is 
ready in all side inputs
-  // that the PTransform consumes. For an arbitrary stage, it is 
significantly complex for the
-  // runner to determine this for each input. As a result, we break fusion 
to simplify this
-  // inspection. In general, a ParDo which consumes side inputs cannot be 
fused into an
-  // executable stage alongside any transforms which are upstream of any 
of its side inputs.
-  return false;
-} else {
-  try {
-ParDoPayload payload = 
ParDoPayload.parseFrom(parDo.getTransform().getSpec().getPayload());
-if (payload.getStateSpecsCount() > 0 || payload.getTimerSpecsCount() > 
0) {
-  // Inputs to a ParDo that uses State or Timers must be 
key-partitioned, and elements for
-  // a key must execute serially. To avoid checking if the rest of the 
stage is
-  // key-partitioned and preserves keys, these ParDos do not fuse into 
an existing stage.
-  return false;
-}
-  } catch (InvalidProtocolBufferException e) {
-throw new IllegalArgumentException(e);
+try {
+  ParDoPayload payload = 
ParDoPayload.parseFrom(parDo.getTransform().getSpec().getPayload());
+  if (Maps.filterKeys(
+  parDo.getTransform().getInputsMap(), s -> 
payload.getTimerSpecsMap().containsKey(s))
+  .values()
+  .contains(candidate.getId())) {
+// Allow fusion across timer PCollections because they are a self loop.
+return true;
+  } else if (payload.getStateSpecsCount() > 0 || 
payload.getTimerSpecsCount() > 0) {
+// Inputs to a ParDo that uses State or Timers must be 
key-partitioned, and elements for
+// a key must execute serially. To avoid checking if the rest of the 
stage is
+// key-partitioned and preserves keys, these ParDos do not fuse into 
an existing stage.
+return false;
+  } else if (!pipeline.getSideInputs(parDo).isEmpty()) {
+// At execution time, a Runner is required to only provide inputs to a 
PTransform that, at
+// the time the PTransform processes them, the associated window is 
ready in all side inputs
+// that the PTransform consumes. For an arbitrary stage, it is 
significantly complex for the
+// runner to determine this for each input. As a result, we break 
fusion to simplify this
+// inspection. In general, a ParDo which consumes side inputs cannot 
be fused into an
+// executable stage alongside any transforms which are upstream of any 
of its side inputs.
+return false;
   }
+} catch (InvalidProtocolBufferException e) {
+  throw new IllegalArgumentException(e);
 }
 return true;
   }
 
   private static boolean parDoCompatibility(
   PTransformNode parDo, PTransformNode other, QueryablePipeline pipeline) {
-// This is a convenience rather than a strict requirement. In general, a 
ParDo that consumes
-// side inputs can be fused with other transforms in the same environment 
which are not
-// upstream of any of the side inputs.
-return pipeline.getSideInputs(parDo).isEmpty()
-// Since we lack the ability to mark upstream transforms as key 
preserving, we
-// purposefully break fusion here to provide runners the opportunity 
to insert a
-// grouping operation
-&& pipeline.getUserStates(parDo).isEmpty()
-&& compatibleEnvironments(parDo, other, pipeline);
+// Implicitly true if we are attempting to fuse against oneself. This is 
for timer PCollection which create a loop.
+return parDo.equals(other)
+// This is a convenience rather than a strict requirement. In general, 
a ParDo that consumes
+// side inputs can be fused with other transforms in the same 
environment which are not
+// upstream of any of the side inputs.
+ 

[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers

2018-07-25 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127498=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127498
 ]

ASF GitHub Bot logged work on BEAM-4658:


Author: ASF GitHub Bot
Created on: 25/Jul/18 20:34
Start Date: 25/Jul/18 20:34
Worklog Time Spent: 10m 
  Work Description: lukecwik edited a comment on issue #6050: [BEAM-4658] 
Update pipeline representation in runner support libraries to handle timers.
URL: https://github.com/apache/beam/pull/6050#issuecomment-407569883
 
 
   R: @tweise @youngoli 
   CC: @charlesccychen @bsidhom


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 127498)
Time Spent: 40m  (was: 0.5h)

> Update pipeline representation in runner support libraries to handle timers
> ---
>
> Key: BEAM-4658
> URL: https://issues.apache.org/jira/browse/BEAM-4658
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-core
>Reporter: Luke Cwik
>Assignee: Luke Cwik
>Priority: Major
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Expose relevant timer information on *ExecutableStage* to runners
> Update fusion logic to handle timers.
> Provide execution time interface to fire timers into *RemoteBundle*s and also 
> to receive new timers that are being set.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers

2018-07-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=126928=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-126928
 ]

ASF GitHub Bot logged work on BEAM-4658:


Author: ASF GitHub Bot
Created on: 24/Jul/18 22:15
Start Date: 24/Jul/18 22:15
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on issue #6050: [BEAM-4658] Update 
pipeline representation in runner support libraries to handle timers.
URL: https://github.com/apache/beam/pull/6050#issuecomment-407569883
 
 
   R: @tweise @angoenka 
   CC: @charlesccychen @bsidhom


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 126928)
Time Spent: 0.5h  (was: 20m)

> Update pipeline representation in runner support libraries to handle timers
> ---
>
> Key: BEAM-4658
> URL: https://issues.apache.org/jira/browse/BEAM-4658
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-core
>Reporter: Luke Cwik
>Assignee: Luke Cwik
>Priority: Major
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Expose relevant timer information on *ExecutableStage* to runners
> Update fusion logic to handle timers.
> Provide execution time interface to fire timers into *RemoteBundle*s and also 
> to receive new timers that are being set.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers

2018-07-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=126927=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-126927
 ]

ASF GitHub Bot logged work on BEAM-4658:


Author: ASF GitHub Bot
Created on: 24/Jul/18 22:14
Start Date: 24/Jul/18 22:14
Worklog Time Spent: 10m 
  Work Description: lukecwik opened a new pull request #6050: [BEAM-4658] 
Update pipeline representation in runner support libraries to handle timers.
URL: https://github.com/apache/beam/pull/6050
 
 
   Note that I modified several runner libraries related to portability to 
support multiple inputs. It was painful because of the use of too many 
abstractions. Hopefully I'll have an opportunity to cut down on how many there 
are.
   
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   It will help us expedite review of your Pull Request if you tag someone 
(e.g. `@username`) to look at it.
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
  [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | --- | --- | --- | ---
   
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 126927)
Time Spent: 20m  (was: 10m)

> Update pipeline representation in runner support libraries to handle timers
> ---
>
> Key: BEAM-4658
> URL: https://issues.apache.org/jira/browse/BEAM-4658
> Project: Beam
>  Issue Type: Sub-task
>  Components: 

[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers

2018-07-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=125253=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-125253
 ]

ASF GitHub Bot logged work on BEAM-4658:


Author: ASF GitHub Bot
Created on: 19/Jul/18 21:34
Start Date: 19/Jul/18 21:34
Worklog Time Spent: 10m 
  Work Description: lukecwik opened a new pull request #6002: [BEAM-4658] 
Modify RemoteBundle interface to allow for multiple inputs.
URL: https://github.com/apache/beam/pull/6002
 
 
   This is towards supporting timers as PCollections.
   
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   It will help us expedite review of your Pull Request if you tag someone 
(e.g. `@username`) to look at it.
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
  [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | --- | --- | --- | ---
   
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 125253)
Time Spent: 10m
Remaining Estimate: 0h

> Update pipeline representation in runner support libraries to handle timers
> ---
>
> Key: BEAM-4658
> URL: https://issues.apache.org/jira/browse/BEAM-4658
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-core
>Reporter: Luke Cwik
>Assignee: Luke Cwik
>Priority: Major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
>