[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers
[ https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=133312=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-133312 ] ASF GitHub Bot logged work on BEAM-4658: Author: ASF GitHub Bot Created on: 09/Aug/18 21:44 Start Date: 09/Aug/18 21:44 Worklog Time Spent: 10m Work Description: lukecwik closed pull request #6187: [BEAM-4658] Follow up on PR comments from #6050 URL: https://github.com/apache/beam/pull/6187 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java index 3215691363c..2ddd80afae8 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java @@ -81,6 +81,7 @@ private static final CompatibilityChecker DEFAULT_COMPATIBILITY_CHECKER = GreedyPCollectionFusers::unknownTransformCompatibility; + /** Returns true if the PTransform node for the given input PCollection can be fused across. */ public static boolean canFuse( PTransformNode transformNode, Environment environment, @@ -92,6 +93,10 @@ public static boolean canFuse( .canFuse(transformNode, environment, candidate, stagePCollections, pipeline); } + /** + * Returns true if the two PTransforms are compatible such that they can be executed in the same + * environment. + */ public static boolean isCompatible( PTransformNode left, PTransformNode right, QueryablePipeline pipeline) { CompatibilityChecker leftChecker = @@ -184,7 +189,8 @@ private static boolean canFuseParDo( private static boolean parDoCompatibility( PTransformNode parDo, PTransformNode other, QueryablePipeline pipeline) { -// Implicitly true if we are attempting to fuse against oneself. This is for timer PCollection which create a loop. +// Implicitly true if we are attempting to fuse against oneself. This case comes up for +// PCollections representing timers since they create a self-loop in the graph. return parDo.equals(other) // This is a convenience rather than a strict requirement. In general, a ParDo that consumes // side inputs can be fused with other transforms in the same environment which are not diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TimerReference.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TimerReference.java index 36dd3284a6a..0c6bf3a36bc 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TimerReference.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TimerReference.java @@ -21,8 +21,9 @@ import org.apache.beam.model.pipeline.v1.RunnerApi; /** - * A reference to a timer. This includes the PTransform that references the timer as well as the - * PCollection referenced. Both are necessary in order to fully resolve a timer. + * Contains references to components relevant for runners during execution for timers. The + * referenced PTransform specifies the timer specification while the PCollection specifies the + * encoding representation. */ @AutoValue public abstract class TimerReference { diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStageTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStageTest.java index 4920e17cba9..41b1e6c1d4a 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStageTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStageTest.java @@ -49,9 +49,9 @@ public void ofFullComponentsOnlyHasStagePTransforms() throws Exception { PTransform.newBuilder() .putInputs("input", "input.out") .putInputs("side_input", "sideInput.in") -.putInputs("timer", "timer.out") +.putInputs("timer", "timer.pc") .putOutputs("output", "output.out") -.putOutputs("timer", "timer.out") +.putOutputs("timer",
[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers
[ https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=133026=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-133026 ] ASF GitHub Bot logged work on BEAM-4658: Author: ASF GitHub Bot Created on: 09/Aug/18 16:10 Start Date: 09/Aug/18 16:10 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #6187: [BEAM-4658] Follow up on PR comments from #6050 URL: https://github.com/apache/beam/pull/6187#issuecomment-411812017 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 133026) Time Spent: 4.5h (was: 4h 20m) > Update pipeline representation in runner support libraries to handle timers > --- > > Key: BEAM-4658 > URL: https://issues.apache.org/jira/browse/BEAM-4658 > Project: Beam > Issue Type: Sub-task > Components: runner-core >Reporter: Luke Cwik >Assignee: Luke Cwik >Priority: Major > Time Spent: 4.5h > Remaining Estimate: 0h > > Expose relevant timer information on *ExecutableStage* to runners > Update fusion logic to handle timers. > Provide execution time interface to fire timers into *RemoteBundle*s and also > to receive new timers that are being set. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers
[ https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=133027=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-133027 ] ASF GitHub Bot logged work on BEAM-4658: Author: ASF GitHub Bot Created on: 09/Aug/18 16:10 Start Date: 09/Aug/18 16:10 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #6187: [BEAM-4658] Follow up on PR comments from #6050 URL: https://github.com/apache/beam/pull/6187#issuecomment-411812096 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 133027) Time Spent: 4h 40m (was: 4.5h) > Update pipeline representation in runner support libraries to handle timers > --- > > Key: BEAM-4658 > URL: https://issues.apache.org/jira/browse/BEAM-4658 > Project: Beam > Issue Type: Sub-task > Components: runner-core >Reporter: Luke Cwik >Assignee: Luke Cwik >Priority: Major > Time Spent: 4h 40m > Remaining Estimate: 0h > > Expose relevant timer information on *ExecutableStage* to runners > Update fusion logic to handle timers. > Provide execution time interface to fire timers into *RemoteBundle*s and also > to receive new timers that are being set. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers
[ https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=132735=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-132735 ] ASF GitHub Bot logged work on BEAM-4658: Author: ASF GitHub Bot Created on: 08/Aug/18 22:44 Start Date: 08/Aug/18 22:44 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #6187: [BEAM-4658] Follow up on PR comments from #6050 URL: https://github.com/apache/beam/pull/6187#issuecomment-411577167 R: @youngoli @tweise This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 132735) Time Spent: 4h 20m (was: 4h 10m) > Update pipeline representation in runner support libraries to handle timers > --- > > Key: BEAM-4658 > URL: https://issues.apache.org/jira/browse/BEAM-4658 > Project: Beam > Issue Type: Sub-task > Components: runner-core >Reporter: Luke Cwik >Assignee: Luke Cwik >Priority: Major > Time Spent: 4h 20m > Remaining Estimate: 0h > > Expose relevant timer information on *ExecutableStage* to runners > Update fusion logic to handle timers. > Provide execution time interface to fire timers into *RemoteBundle*s and also > to receive new timers that are being set. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers
[ https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=132734=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-132734 ] ASF GitHub Bot logged work on BEAM-4658: Author: ASF GitHub Bot Created on: 08/Aug/18 22:43 Start Date: 08/Aug/18 22:43 Worklog Time Spent: 10m Work Description: lukecwik opened a new pull request #6187: [BEAM-4658] Follow up on PR comments from #6050 URL: https://github.com/apache/beam/pull/6187 https://github.com/apache/beam/pull/6050 Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). It will help us expedite review of your Pull Request if you tag someone (e.g. `@username`) to look at it. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | --- | --- | --- | --- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 132734) Time Spent: 4h 10m (was: 4h) > Update pipeline representation in runner support libraries to handle timers > --- > > Key: BEAM-4658 > URL: https://issues.apache.org/jira/browse/BEAM-4658 > Project: Beam > Issue Type: Sub-task > Components: runner-core >Reporter: Luke Cwik >Assignee: Luke Cwik >Priority: Major > Time Spent: 4h 10m > Remaining Estimate: 0h > > Expose relevant timer information on *ExecutableStage* to
[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers
[ https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=132727=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-132727 ] ASF GitHub Bot logged work on BEAM-4658: Author: ASF GitHub Bot Created on: 08/Aug/18 22:22 Start Date: 08/Aug/18 22:22 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #6050: [BEAM-4658] Update pipeline representation in runner support libraries to handle timers. URL: https://github.com/apache/beam/pull/6050#issuecomment-411572212 Yes, the Java SDK could be used to test timers in Flink. This would make Flink the first runner to support timers using portability. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 132727) Time Spent: 4h (was: 3h 50m) > Update pipeline representation in runner support libraries to handle timers > --- > > Key: BEAM-4658 > URL: https://issues.apache.org/jira/browse/BEAM-4658 > Project: Beam > Issue Type: Sub-task > Components: runner-core >Reporter: Luke Cwik >Assignee: Luke Cwik >Priority: Major > Time Spent: 4h > Remaining Estimate: 0h > > Expose relevant timer information on *ExecutableStage* to runners > Update fusion logic to handle timers. > Provide execution time interface to fire timers into *RemoteBundle*s and also > to receive new timers that are being set. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers
[ https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127911=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127911 ] ASF GitHub Bot logged work on BEAM-4658: Author: ASF GitHub Bot Created on: 26/Jul/18 20:29 Start Date: 26/Jul/18 20:29 Worklog Time Spent: 10m Work Description: tweise commented on issue #6050: [BEAM-4658] Update pipeline representation in runner support libraries to handle timers. URL: https://github.com/apache/beam/pull/6050#issuecomment-408225259 LGTM. The timer support in the Python SDK isn't ready, but any implementation in the Flink runner could be tested using the Java SDK at this point? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 127911) Time Spent: 3h 50m (was: 3h 40m) > Update pipeline representation in runner support libraries to handle timers > --- > > Key: BEAM-4658 > URL: https://issues.apache.org/jira/browse/BEAM-4658 > Project: Beam > Issue Type: Sub-task > Components: runner-core >Reporter: Luke Cwik >Assignee: Luke Cwik >Priority: Major > Time Spent: 3h 50m > Remaining Estimate: 0h > > Expose relevant timer information on *ExecutableStage* to runners > Update fusion logic to handle timers. > Provide execution time interface to fire timers into *RemoteBundle*s and also > to receive new timers that are being set. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers
[ https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127909=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127909 ] ASF GitHub Bot logged work on BEAM-4658: Author: ASF GitHub Bot Created on: 26/Jul/18 20:07 Start Date: 26/Jul/18 20:07 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #6050: [BEAM-4658] Update pipeline representation in runner support libraries to handle timers. URL: https://github.com/apache/beam/pull/6050#discussion_r205586171 ## File path: runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStageTest.java ## @@ -58,11 +60,13 @@ public void ofFullComponentsOnlyHasStagePTransforms() throws Exception { .setDoFn(RunnerApi.SdkFunctionSpec.newBuilder().setEnvironmentId("foo")) .putSideInputs("side_input", RunnerApi.SideInput.getDefaultInstance()) .putStateSpecs("user_state", RunnerApi.StateSpec.getDefaultInstance()) +.putTimerSpecs("timer", RunnerApi.TimerSpec.getDefaultInstance()) .build() .toByteString())) .build(); PCollection input = PCollection.newBuilder().setUniqueName("input.out").build(); PCollection sideInput = PCollection.newBuilder().setUniqueName("sideInput.in").build(); +PCollection timer = PCollection.newBuilder().setUniqueName("timer.out").build(); Review comment: It is confusing to reference the single timer collection as "timer.out" when it is used for both input and output. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 127909) Time Spent: 3h 40m (was: 3.5h) > Update pipeline representation in runner support libraries to handle timers > --- > > Key: BEAM-4658 > URL: https://issues.apache.org/jira/browse/BEAM-4658 > Project: Beam > Issue Type: Sub-task > Components: runner-core >Reporter: Luke Cwik >Assignee: Luke Cwik >Priority: Major > Time Spent: 3h 40m > Remaining Estimate: 0h > > Expose relevant timer information on *ExecutableStage* to runners > Update fusion logic to handle timers. > Provide execution time interface to fire timers into *RemoteBundle*s and also > to receive new timers that are being set. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers
[ https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127906=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127906 ] ASF GitHub Bot logged work on BEAM-4658: Author: ASF GitHub Bot Created on: 26/Jul/18 19:58 Start Date: 26/Jul/18 19:58 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #6050: [BEAM-4658] Update pipeline representation in runner support libraries to handle timers. URL: https://github.com/apache/beam/pull/6050#discussion_r205583228 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TimerReference.java ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction.graph; + +import com.google.auto.value.AutoValue; +import org.apache.beam.model.pipeline.v1.RunnerApi; + +/** + * A reference to a timer. This includes the PTransform that references the timer as well as the Review comment: For the PTransform it is more an "owns" relationship rather than "references"? Why is the PCollection required to fully resolve a timer? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 127906) Time Spent: 3.5h (was: 3h 20m) > Update pipeline representation in runner support libraries to handle timers > --- > > Key: BEAM-4658 > URL: https://issues.apache.org/jira/browse/BEAM-4658 > Project: Beam > Issue Type: Sub-task > Components: runner-core >Reporter: Luke Cwik >Assignee: Luke Cwik >Priority: Major > Time Spent: 3.5h > Remaining Estimate: 0h > > Expose relevant timer information on *ExecutableStage* to runners > Update fusion logic to handle timers. > Provide execution time interface to fire timers into *RemoteBundle*s and also > to receive new timers that are being set. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers
[ https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127905=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127905 ] ASF GitHub Bot logged work on BEAM-4658: Author: ASF GitHub Bot Created on: 26/Jul/18 19:57 Start Date: 26/Jul/18 19:57 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #6050: [BEAM-4658] Update pipeline representation in runner support libraries to handle timers. URL: https://github.com/apache/beam/pull/6050#discussion_r205583228 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TimerReference.java ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction.graph; + +import com.google.auto.value.AutoValue; +import org.apache.beam.model.pipeline.v1.RunnerApi; + +/** + * A reference to a timer. This includes the PTransform that references the timer as well as the Review comment: For the PTransform it is more an "owns" relationship rather than "references"? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 127905) Time Spent: 3h 20m (was: 3h 10m) > Update pipeline representation in runner support libraries to handle timers > --- > > Key: BEAM-4658 > URL: https://issues.apache.org/jira/browse/BEAM-4658 > Project: Beam > Issue Type: Sub-task > Components: runner-core >Reporter: Luke Cwik >Assignee: Luke Cwik >Priority: Major > Time Spent: 3h 20m > Remaining Estimate: 0h > > Expose relevant timer information on *ExecutableStage* to runners > Update fusion logic to handle timers. > Provide execution time interface to fire timers into *RemoteBundle*s and also > to receive new timers that are being set. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers
[ https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127903=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127903 ] ASF GitHub Bot logged work on BEAM-4658: Author: ASF GitHub Bot Created on: 26/Jul/18 19:48 Start Date: 26/Jul/18 19:48 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #6050: [BEAM-4658] Update pipeline representation in runner support libraries to handle timers. URL: https://github.com/apache/beam/pull/6050#discussion_r205581055 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java ## @@ -150,41 +154,51 @@ private static boolean canFuseParDo( // is never possible. return false; } -if (!pipeline.getSideInputs(parDo).isEmpty()) { - // At execution time, a Runner is required to only provide inputs to a PTransform that, at - // the time the PTransform processes them, the associated window is ready in all side inputs - // that the PTransform consumes. For an arbitrary stage, it is significantly complex for the - // runner to determine this for each input. As a result, we break fusion to simplify this - // inspection. In general, a ParDo which consumes side inputs cannot be fused into an - // executable stage alongside any transforms which are upstream of any of its side inputs. - return false; -} else { - try { -ParDoPayload payload = ParDoPayload.parseFrom(parDo.getTransform().getSpec().getPayload()); -if (payload.getStateSpecsCount() > 0 || payload.getTimerSpecsCount() > 0) { - // Inputs to a ParDo that uses State or Timers must be key-partitioned, and elements for - // a key must execute serially. To avoid checking if the rest of the stage is - // key-partitioned and preserves keys, these ParDos do not fuse into an existing stage. - return false; -} - } catch (InvalidProtocolBufferException e) { -throw new IllegalArgumentException(e); +try { + ParDoPayload payload = ParDoPayload.parseFrom(parDo.getTransform().getSpec().getPayload()); + if (Maps.filterKeys( + parDo.getTransform().getInputsMap(), s -> payload.getTimerSpecsMap().containsKey(s)) + .values() + .contains(candidate.getId())) { +// Allow fusion across timer PCollections because they are a self loop. +return true; + } else if (payload.getStateSpecsCount() > 0 || payload.getTimerSpecsCount() > 0) { +// Inputs to a ParDo that uses State or Timers must be key-partitioned, and elements for +// a key must execute serially. To avoid checking if the rest of the stage is +// key-partitioned and preserves keys, these ParDos do not fuse into an existing stage. +return false; + } else if (!pipeline.getSideInputs(parDo).isEmpty()) { +// At execution time, a Runner is required to only provide inputs to a PTransform that, at +// the time the PTransform processes them, the associated window is ready in all side inputs +// that the PTransform consumes. For an arbitrary stage, it is significantly complex for the +// runner to determine this for each input. As a result, we break fusion to simplify this +// inspection. In general, a ParDo which consumes side inputs cannot be fused into an +// executable stage alongside any transforms which are upstream of any of its side inputs. +return false; } +} catch (InvalidProtocolBufferException e) { + throw new IllegalArgumentException(e); } return true; } private static boolean parDoCompatibility( Review comment: nit: the name of this method sounds odd, should it be canFuse or something? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 127903) Time Spent: 3h 10m (was: 3h) > Update pipeline representation in runner support libraries to handle timers > --- > > Key: BEAM-4658 > URL: https://issues.apache.org/jira/browse/BEAM-4658 > Project: Beam > Issue Type: Sub-task > Components: runner-core >Reporter: Luke Cwik >Assignee: Luke Cwik >Priority: Major > Time Spent: 3h 10m > Remaining Estimate: 0h > > Expose relevant timer information on *ExecutableStage* to
[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers
[ https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127902=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127902 ] ASF GitHub Bot logged work on BEAM-4658: Author: ASF GitHub Bot Created on: 26/Jul/18 19:38 Start Date: 26/Jul/18 19:38 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #6050: [BEAM-4658] Update pipeline representation in runner support libraries to handle timers. URL: https://github.com/apache/beam/pull/6050#discussion_r205578326 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java ## @@ -150,41 +154,51 @@ private static boolean canFuseParDo( // is never possible. return false; } -if (!pipeline.getSideInputs(parDo).isEmpty()) { - // At execution time, a Runner is required to only provide inputs to a PTransform that, at - // the time the PTransform processes them, the associated window is ready in all side inputs - // that the PTransform consumes. For an arbitrary stage, it is significantly complex for the - // runner to determine this for each input. As a result, we break fusion to simplify this - // inspection. In general, a ParDo which consumes side inputs cannot be fused into an - // executable stage alongside any transforms which are upstream of any of its side inputs. - return false; -} else { - try { -ParDoPayload payload = ParDoPayload.parseFrom(parDo.getTransform().getSpec().getPayload()); -if (payload.getStateSpecsCount() > 0 || payload.getTimerSpecsCount() > 0) { - // Inputs to a ParDo that uses State or Timers must be key-partitioned, and elements for - // a key must execute serially. To avoid checking if the rest of the stage is - // key-partitioned and preserves keys, these ParDos do not fuse into an existing stage. - return false; -} - } catch (InvalidProtocolBufferException e) { -throw new IllegalArgumentException(e); +try { + ParDoPayload payload = ParDoPayload.parseFrom(parDo.getTransform().getSpec().getPayload()); + if (Maps.filterKeys( + parDo.getTransform().getInputsMap(), s -> payload.getTimerSpecsMap().containsKey(s)) + .values() + .contains(candidate.getId())) { +// Allow fusion across timer PCollections because they are a self loop. +return true; + } else if (payload.getStateSpecsCount() > 0 || payload.getTimerSpecsCount() > 0) { +// Inputs to a ParDo that uses State or Timers must be key-partitioned, and elements for Review comment: yup, timers are really a special kind of state This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 127902) Time Spent: 3h (was: 2h 50m) > Update pipeline representation in runner support libraries to handle timers > --- > > Key: BEAM-4658 > URL: https://issues.apache.org/jira/browse/BEAM-4658 > Project: Beam > Issue Type: Sub-task > Components: runner-core >Reporter: Luke Cwik >Assignee: Luke Cwik >Priority: Major > Time Spent: 3h > Remaining Estimate: 0h > > Expose relevant timer information on *ExecutableStage* to runners > Update fusion logic to handle timers. > Provide execution time interface to fire timers into *RemoteBundle*s and also > to receive new timers that are being set. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers
[ https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127877=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127877 ] ASF GitHub Bot logged work on BEAM-4658: Author: ASF GitHub Bot Created on: 26/Jul/18 18:19 Start Date: 26/Jul/18 18:19 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #6050: [BEAM-4658] Update pipeline representation in runner support libraries to handle timers. URL: https://github.com/apache/beam/pull/6050#discussion_r20852 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java ## @@ -104,7 +118,7 @@ * The {@link PTransform#getSubtransformsList()} is empty. This ensures that executable * stages are treated as primitive transforms. * The only {@link PCollection} in the {@link PTransform#getInputsMap()} is the result of - * {@link #getInputPCollection()}. + * {@link #getInputPCollection()} and {@link #getSideInputs()}. Review comment: I have another pass to perform a few fixes to list all inputs and to consider timers as materialized PCollections so I can address this comment then. The timers are usable during execution now though, and the rest is clean-up/simplification. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 127877) Time Spent: 2h 50m (was: 2h 40m) > Update pipeline representation in runner support libraries to handle timers > --- > > Key: BEAM-4658 > URL: https://issues.apache.org/jira/browse/BEAM-4658 > Project: Beam > Issue Type: Sub-task > Components: runner-core >Reporter: Luke Cwik >Assignee: Luke Cwik >Priority: Major > Time Spent: 2h 50m > Remaining Estimate: 0h > > Expose relevant timer information on *ExecutableStage* to runners > Update fusion logic to handle timers. > Provide execution time interface to fire timers into *RemoteBundle*s and also > to receive new timers that are being set. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers
[ https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127867=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127867 ] ASF GitHub Bot logged work on BEAM-4658: Author: ASF GitHub Bot Created on: 26/Jul/18 17:50 Start Date: 26/Jul/18 17:50 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #6050: [BEAM-4658] Update pipeline representation in runner support libraries to handle timers. URL: https://github.com/apache/beam/pull/6050#discussion_r20554 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java ## @@ -104,7 +118,7 @@ * The {@link PTransform#getSubtransformsList()} is empty. This ensures that executable * stages are treated as primitive transforms. * The only {@link PCollection} in the {@link PTransform#getInputsMap()} is the result of - * {@link #getInputPCollection()}. + * {@link #getInputPCollection()} and {@link #getSideInputs()}. Review comment: Perhaps `The {@link PTransform#getInputsMap()} is the result of` would be clearer. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 127867) Time Spent: 2.5h (was: 2h 20m) > Update pipeline representation in runner support libraries to handle timers > --- > > Key: BEAM-4658 > URL: https://issues.apache.org/jira/browse/BEAM-4658 > Project: Beam > Issue Type: Sub-task > Components: runner-core >Reporter: Luke Cwik >Assignee: Luke Cwik >Priority: Major > Time Spent: 2.5h > Remaining Estimate: 0h > > Expose relevant timer information on *ExecutableStage* to runners > Update fusion logic to handle timers. > Provide execution time interface to fire timers into *RemoteBundle*s and also > to receive new timers that are being set. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers
[ https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127866=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127866 ] ASF GitHub Bot logged work on BEAM-4658: Author: ASF GitHub Bot Created on: 26/Jul/18 17:31 Start Date: 26/Jul/18 17:31 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #6050: [BEAM-4658] Update pipeline representation in runner support libraries to handle timers. URL: https://github.com/apache/beam/pull/6050#issuecomment-408174187 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 127866) Time Spent: 2h 20m (was: 2h 10m) > Update pipeline representation in runner support libraries to handle timers > --- > > Key: BEAM-4658 > URL: https://issues.apache.org/jira/browse/BEAM-4658 > Project: Beam > Issue Type: Sub-task > Components: runner-core >Reporter: Luke Cwik >Assignee: Luke Cwik >Priority: Major > Time Spent: 2h 20m > Remaining Estimate: 0h > > Expose relevant timer information on *ExecutableStage* to runners > Update fusion logic to handle timers. > Provide execution time interface to fire timers into *RemoteBundle*s and also > to receive new timers that are being set. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers
[ https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127860=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127860 ] ASF GitHub Bot logged work on BEAM-4658: Author: ASF GitHub Bot Created on: 26/Jul/18 17:13 Start Date: 26/Jul/18 17:13 Worklog Time Spent: 10m Work Description: youngoli commented on a change in pull request #6050: [BEAM-4658] Update pipeline representation in runner support libraries to handle timers. URL: https://github.com/apache/beam/pull/6050#discussion_r205534735 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/Networks.java ## @@ -193,29 +201,55 @@ public final NodeT apply(NodeT input) { "Only networks without self loops are supported, given %s", network); -// Linked hashset will prevent duplicates from appearing and will maintain insertion order. -LinkedHashSet nodes = new LinkedHashSet<>(network.nodes().size()); -Queue processingOrder = new ArrayDeque<>(); -// Add all the roots -for (NodeT node : network.nodes()) { - if (network.inDegree(node) == 0) { -processingOrder.add(node); - } -} +// Uses the following algorithm: Review comment: Ah, the detail I missed was that it the algorithm would gather any newly created sinks/sources (I thought after removing Sink node B wouldn't be considered a sink). It makes sense now and I believe it does work for topological ordering. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 127860) Time Spent: 2h 10m (was: 2h) > Update pipeline representation in runner support libraries to handle timers > --- > > Key: BEAM-4658 > URL: https://issues.apache.org/jira/browse/BEAM-4658 > Project: Beam > Issue Type: Sub-task > Components: runner-core >Reporter: Luke Cwik >Assignee: Luke Cwik >Priority: Major > Time Spent: 2h 10m > Remaining Estimate: 0h > > Expose relevant timer information on *ExecutableStage* to runners > Update fusion logic to handle timers. > Provide execution time interface to fire timers into *RemoteBundle*s and also > to receive new timers that are being set. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers
[ https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127828=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127828 ] ASF GitHub Bot logged work on BEAM-4658: Author: ASF GitHub Bot Created on: 26/Jul/18 16:15 Start Date: 26/Jul/18 16:15 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #6050: [BEAM-4658] Update pipeline representation in runner support libraries to handle timers. URL: https://github.com/apache/beam/pull/6050#discussion_r205517383 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/Networks.java ## @@ -193,29 +201,55 @@ public final NodeT apply(NodeT input) { "Only networks without self loops are supported, given %s", network); -// Linked hashset will prevent duplicates from appearing and will maintain insertion order. -LinkedHashSet nodes = new LinkedHashSet<>(network.nodes().size()); -Queue processingOrder = new ArrayDeque<>(); -// Add all the roots -for (NodeT node : network.nodes()) { - if (network.inDegree(node) == 0) { -processingOrder.add(node); - } -} +// Uses the following algorithm: Review comment: Only nodes that are part of cycles are ever removed as part of the "delta" calculation. The algorithm: 1) Removes all sinks from the graph (including newly created sinks once the graph is updated) until there are none. 2) Removes all sources from the graph (including newly created sources once the graph is updated) until there are none. 3) Remote a single back edge which highest delta. This may break a cycle. Concretely, in your example, the algorithm would: * add Sink to the beginning of S2, removing it from the graph. * add B to the beginning of S2, removing it from the graph. * add A to the beginning of S2, removing it from the graph. * add Source1 or Source2 (ambiguous so lets say Source1) to the beginning of S2, removing it from the graph. * add Source2 to the beginning of S2, removing it from the graph. I expanded the comment within the code about the algorithm to clarify some parts of it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 127828) Time Spent: 2h (was: 1h 50m) > Update pipeline representation in runner support libraries to handle timers > --- > > Key: BEAM-4658 > URL: https://issues.apache.org/jira/browse/BEAM-4658 > Project: Beam > Issue Type: Sub-task > Components: runner-core >Reporter: Luke Cwik >Assignee: Luke Cwik >Priority: Major > Time Spent: 2h > Remaining Estimate: 0h > > Expose relevant timer information on *ExecutableStage* to runners > Update fusion logic to handle timers. > Provide execution time interface to fire timers into *RemoteBundle*s and also > to receive new timers that are being set. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers
[ https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127820=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127820 ] ASF GitHub Bot logged work on BEAM-4658: Author: ASF GitHub Bot Created on: 26/Jul/18 15:58 Start Date: 26/Jul/18 15:58 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #6050: [BEAM-4658] Update pipeline representation in runner support libraries to handle timers. URL: https://github.com/apache/beam/pull/6050#discussion_r205512052 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java ## @@ -337,11 +337,22 @@ public static TupleTagList getAdditionalOutputTags(AppliedPTransform ap ptransform.getSpec().getUrn()); ParDoPayload payload = ParDoPayload.parseFrom(ptransform.getSpec().getPayload()); return components.getPcollectionsOrThrow( -ptransform.getInputsOrThrow(getMainInputId(ptransform, payload))); +ptransform.getInputsOrThrow(getMainInputName(ptransform, payload))); Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 127820) Time Spent: 1h 50m (was: 1h 40m) > Update pipeline representation in runner support libraries to handle timers > --- > > Key: BEAM-4658 > URL: https://issues.apache.org/jira/browse/BEAM-4658 > Project: Beam > Issue Type: Sub-task > Components: runner-core >Reporter: Luke Cwik >Assignee: Luke Cwik >Priority: Major > Time Spent: 1h 50m > Remaining Estimate: 0h > > Expose relevant timer information on *ExecutableStage* to runners > Update fusion logic to handle timers. > Provide execution time interface to fire timers into *RemoteBundle*s and also > to receive new timers that are being set. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers
[ https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127818=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127818 ] ASF GitHub Bot logged work on BEAM-4658: Author: ASF GitHub Bot Created on: 26/Jul/18 15:58 Start Date: 26/Jul/18 15:58 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #6050: [BEAM-4658] Update pipeline representation in runner support libraries to handle timers. URL: https://github.com/apache/beam/pull/6050#discussion_r205512015 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java ## @@ -150,41 +154,51 @@ private static boolean canFuseParDo( // is never possible. return false; } -if (!pipeline.getSideInputs(parDo).isEmpty()) { - // At execution time, a Runner is required to only provide inputs to a PTransform that, at - // the time the PTransform processes them, the associated window is ready in all side inputs - // that the PTransform consumes. For an arbitrary stage, it is significantly complex for the - // runner to determine this for each input. As a result, we break fusion to simplify this - // inspection. In general, a ParDo which consumes side inputs cannot be fused into an - // executable stage alongside any transforms which are upstream of any of its side inputs. - return false; -} else { - try { -ParDoPayload payload = ParDoPayload.parseFrom(parDo.getTransform().getSpec().getPayload()); -if (payload.getStateSpecsCount() > 0 || payload.getTimerSpecsCount() > 0) { - // Inputs to a ParDo that uses State or Timers must be key-partitioned, and elements for - // a key must execute serially. To avoid checking if the rest of the stage is - // key-partitioned and preserves keys, these ParDos do not fuse into an existing stage. - return false; -} - } catch (InvalidProtocolBufferException e) { -throw new IllegalArgumentException(e); +try { + ParDoPayload payload = ParDoPayload.parseFrom(parDo.getTransform().getSpec().getPayload()); + if (Maps.filterKeys( + parDo.getTransform().getInputsMap(), s -> payload.getTimerSpecsMap().containsKey(s)) + .values() + .contains(candidate.getId())) { +// Allow fusion across timer PCollections because they are a self loop. +return true; + } else if (payload.getStateSpecsCount() > 0 || payload.getTimerSpecsCount() > 0) { +// Inputs to a ParDo that uses State or Timers must be key-partitioned, and elements for +// a key must execute serially. To avoid checking if the rest of the stage is +// key-partitioned and preserves keys, these ParDos do not fuse into an existing stage. +return false; + } else if (!pipeline.getSideInputs(parDo).isEmpty()) { +// At execution time, a Runner is required to only provide inputs to a PTransform that, at +// the time the PTransform processes them, the associated window is ready in all side inputs +// that the PTransform consumes. For an arbitrary stage, it is significantly complex for the +// runner to determine this for each input. As a result, we break fusion to simplify this +// inspection. In general, a ParDo which consumes side inputs cannot be fused into an +// executable stage alongside any transforms which are upstream of any of its side inputs. +return false; } +} catch (InvalidProtocolBufferException e) { + throw new IllegalArgumentException(e); } return true; } private static boolean parDoCompatibility( PTransformNode parDo, PTransformNode other, QueryablePipeline pipeline) { -// This is a convenience rather than a strict requirement. In general, a ParDo that consumes -// side inputs can be fused with other transforms in the same environment which are not -// upstream of any of the side inputs. -return pipeline.getSideInputs(parDo).isEmpty() -// Since we lack the ability to mark upstream transforms as key preserving, we -// purposefully break fusion here to provide runners the opportunity to insert a -// grouping operation -&& pipeline.getUserStates(parDo).isEmpty() -&& compatibleEnvironments(parDo, other, pipeline); +// Implicitly true if we are attempting to fuse against oneself. This is for timer PCollection which create a loop. +return parDo.equals(other) +// This is a convenience rather than a strict requirement. In general, a ParDo that consumes +// side inputs can be fused with other transforms in the same environment which are not +// upstream of any of the side inputs. +
[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers
[ https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127819=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127819 ] ASF GitHub Bot logged work on BEAM-4658: Author: ASF GitHub Bot Created on: 26/Jul/18 15:58 Start Date: 26/Jul/18 15:58 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #6050: [BEAM-4658] Update pipeline representation in runner support libraries to handle timers. URL: https://github.com/apache/beam/pull/6050#discussion_r205512043 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java ## @@ -104,7 +118,7 @@ * The {@link PTransform#getSubtransformsList()} is empty. This ensures that executable * stages are treated as primitive transforms. * The only {@link PCollection} in the {@link PTransform#getInputsMap()} is the result of - * {@link #getInputPCollection()}. + * {@link #getInputPCollection()} and {@link #getSideInputs()}. Review comment: It is multiple, fixed comment. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 127819) Time Spent: 1h 40m (was: 1.5h) > Update pipeline representation in runner support libraries to handle timers > --- > > Key: BEAM-4658 > URL: https://issues.apache.org/jira/browse/BEAM-4658 > Project: Beam > Issue Type: Sub-task > Components: runner-core >Reporter: Luke Cwik >Assignee: Luke Cwik >Priority: Major > Time Spent: 1h 40m > Remaining Estimate: 0h > > Expose relevant timer information on *ExecutableStage* to runners > Update fusion logic to handle timers. > Provide execution time interface to fire timers into *RemoteBundle*s and also > to receive new timers that are being set. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers
[ https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127769=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127769 ] ASF GitHub Bot logged work on BEAM-4658: Author: ASF GitHub Bot Created on: 26/Jul/18 14:45 Start Date: 26/Jul/18 14:45 Worklog Time Spent: 10m Work Description: lukecwik closed pull request #6002: [BEAM-4658] Modify RemoteBundle interface to allow for multiple inputs. URL: https://github.com/apache/beam/pull/6002 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory.java index ca20949c953..9a9121b92a4 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.direct.portable; +import com.google.common.collect.Iterables; import java.util.ArrayList; import java.util.Collection; import javax.annotation.Nullable; @@ -29,6 +30,7 @@ import org.apache.beam.runners.fnexecution.control.RemoteBundle; import org.apache.beam.runners.fnexecution.control.StageBundleFactory; import org.apache.beam.runners.fnexecution.state.StateRequestHandler; +import org.apache.beam.sdk.fn.data.FnDataReceiver; import org.apache.beam.sdk.util.WindowedValue; /** @@ -59,7 +61,8 @@ public void cleanup() throws Exception { private class RemoteStageEvaluator implements TransformEvaluator { private final PTransformNode transform; -private final RemoteBundle bundle; +private final RemoteBundle bundle; +private final FnDataReceiver> mainInput; private final Collection> outputs; private RemoteStageEvaluator(PTransformNode transform) throws Exception { @@ -67,19 +70,20 @@ private RemoteStageEvaluator(PTransformNode transform) throws Exception { ExecutableStage stage = ExecutableStage.fromPayload( ExecutableStagePayload.parseFrom(transform.getTransform().getSpec().getPayload())); - outputs = new ArrayList<>(); + this.outputs = new ArrayList<>(); StageBundleFactory stageFactory = jobFactory.forStage(stage); - bundle = + this.bundle = stageFactory.getBundle( BundleFactoryOutputReceiverFactory.create( bundleFactory, stage.getComponents(), outputs::add), StateRequestHandler.unsupported(), BundleProgressHandler.unsupported()); + this.mainInput = Iterables.getOnlyElement(bundle.getInputReceivers().values()); } @Override public void processElement(WindowedValue element) throws Exception { - bundle.getInputReceiver().accept(element); + mainInput.accept(element); } @Override diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/SplittableRemoteStageEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/SplittableRemoteStageEvaluatorFactory.java index d1aaf174a0b..8119c40d3cb 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/SplittableRemoteStageEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/SplittableRemoteStageEvaluatorFactory.java @@ -36,6 +36,7 @@ import org.apache.beam.runners.fnexecution.wire.WireCoders; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.fn.data.FnDataReceiver; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; @@ -89,7 +90,8 @@ public void cleanup() throws Exception { private final CopyOnAccessInMemoryStateInternals stateInternals; private final DirectTimerInternals timerInternals; -private final RemoteBundle> bundle; +private final RemoteBundle bundle; +private final FnDataReceiver> mainInput; private final Collection> outputs; private final SDFFeederViaStateAndTimers feeder; @@ -144,6 +146,7 @@ public void onCompleted(ProcessBundleResponse response) { } } }); + this.mainInput = Iterables.getOnlyElement(bundle.getInputReceivers().values()); } @Override @@ -158,7 +161,7 @@ public void processElement( }
[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers
[ https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127658=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127658 ] ASF GitHub Bot logged work on BEAM-4658: Author: ASF GitHub Bot Created on: 26/Jul/18 06:09 Start Date: 26/Jul/18 06:09 Worklog Time Spent: 10m Work Description: youngoli commented on a change in pull request #6050: [BEAM-4658] Update pipeline representation in runner support libraries to handle timers. URL: https://github.com/apache/beam/pull/6050#discussion_r205256708 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java ## @@ -337,11 +337,22 @@ public static TupleTagList getAdditionalOutputTags(AppliedPTransform ap ptransform.getSpec().getUrn()); ParDoPayload payload = ParDoPayload.parseFrom(ptransform.getSpec().getPayload()); return components.getPcollectionsOrThrow( -ptransform.getInputsOrThrow(getMainInputId(ptransform, payload))); +ptransform.getInputsOrThrow(getMainInputName(ptransform, payload))); Review comment: Since you added a version of getMainInputName that retrieves the payload, what about using that here? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 127658) Time Spent: 1h (was: 50m) > Update pipeline representation in runner support libraries to handle timers > --- > > Key: BEAM-4658 > URL: https://issues.apache.org/jira/browse/BEAM-4658 > Project: Beam > Issue Type: Sub-task > Components: runner-core >Reporter: Luke Cwik >Assignee: Luke Cwik >Priority: Major > Time Spent: 1h > Remaining Estimate: 0h > > Expose relevant timer information on *ExecutableStage* to runners > Update fusion logic to handle timers. > Provide execution time interface to fire timers into *RemoteBundle*s and also > to receive new timers that are being set. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers
[ https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127656=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127656 ] ASF GitHub Bot logged work on BEAM-4658: Author: ASF GitHub Bot Created on: 26/Jul/18 06:09 Start Date: 26/Jul/18 06:09 Worklog Time Spent: 10m Work Description: youngoli commented on a change in pull request #6050: [BEAM-4658] Update pipeline representation in runner support libraries to handle timers. URL: https://github.com/apache/beam/pull/6050#discussion_r205290440 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/Networks.java ## @@ -193,29 +201,55 @@ public final NodeT apply(NodeT input) { "Only networks without self loops are supported, given %s", network); -// Linked hashset will prevent duplicates from appearing and will maintain insertion order. -LinkedHashSet nodes = new LinkedHashSet<>(network.nodes().size()); -Queue processingOrder = new ArrayDeque<>(); -// Add all the roots -for (NodeT node : network.nodes()) { - if (network.inDegree(node) == 0) { -processingOrder.add(node); - } -} +// Uses the following algorithm: Review comment: Using this algorithm for topological order doesn't seem appropriate. From what I could understand from the paper, this algorithm generates a vertex sequence that makes it easy to remove feedback arcs (i.e. eliminate any cycles), but doesn't actually maintain topological order. As a concrete example, if you had a graph that looked like so: ``` Source1 --\ ---> A ---> B ---> Sink Source2 --/ ``` Topologically I would expect the result to be: `Source1, Source2, A, B, Sink`. But instead this algorithm would produce `Source1, Source2, B, A, Sink` because B is considered to have the higher delta than A (delta(B) = 0, delta(A) = -1). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 127656) Time Spent: 50m (was: 40m) > Update pipeline representation in runner support libraries to handle timers > --- > > Key: BEAM-4658 > URL: https://issues.apache.org/jira/browse/BEAM-4658 > Project: Beam > Issue Type: Sub-task > Components: runner-core >Reporter: Luke Cwik >Assignee: Luke Cwik >Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > > Expose relevant timer information on *ExecutableStage* to runners > Update fusion logic to handle timers. > Provide execution time interface to fire timers into *RemoteBundle*s and also > to receive new timers that are being set. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers
[ https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127657=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127657 ] ASF GitHub Bot logged work on BEAM-4658: Author: ASF GitHub Bot Created on: 26/Jul/18 06:09 Start Date: 26/Jul/18 06:09 Worklog Time Spent: 10m Work Description: youngoli commented on a change in pull request #6050: [BEAM-4658] Update pipeline representation in runner support libraries to handle timers. URL: https://github.com/apache/beam/pull/6050#discussion_r205264448 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java ## @@ -104,7 +118,7 @@ * The {@link PTransform#getSubtransformsList()} is empty. This ensures that executable * stages are treated as primitive transforms. * The only {@link PCollection} in the {@link PTransform#getInputsMap()} is the result of - * {@link #getInputPCollection()}. + * {@link #getInputPCollection()} and {@link #getSideInputs()}. Review comment: The way this is worded makes it sound like PTransform.getInputsMap has only one PCollection, which sounds odd if it's the result of both those functions. Is that correct, or does it actually contain multiple PCollections now? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 127657) > Update pipeline representation in runner support libraries to handle timers > --- > > Key: BEAM-4658 > URL: https://issues.apache.org/jira/browse/BEAM-4658 > Project: Beam > Issue Type: Sub-task > Components: runner-core >Reporter: Luke Cwik >Assignee: Luke Cwik >Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > > Expose relevant timer information on *ExecutableStage* to runners > Update fusion logic to handle timers. > Provide execution time interface to fire timers into *RemoteBundle*s and also > to receive new timers that are being set. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers
[ https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127659=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127659 ] ASF GitHub Bot logged work on BEAM-4658: Author: ASF GitHub Bot Created on: 26/Jul/18 06:09 Start Date: 26/Jul/18 06:09 Worklog Time Spent: 10m Work Description: youngoli commented on a change in pull request #6050: [BEAM-4658] Update pipeline representation in runner support libraries to handle timers. URL: https://github.com/apache/beam/pull/6050#discussion_r205274669 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java ## @@ -150,41 +154,51 @@ private static boolean canFuseParDo( // is never possible. return false; } -if (!pipeline.getSideInputs(parDo).isEmpty()) { - // At execution time, a Runner is required to only provide inputs to a PTransform that, at - // the time the PTransform processes them, the associated window is ready in all side inputs - // that the PTransform consumes. For an arbitrary stage, it is significantly complex for the - // runner to determine this for each input. As a result, we break fusion to simplify this - // inspection. In general, a ParDo which consumes side inputs cannot be fused into an - // executable stage alongside any transforms which are upstream of any of its side inputs. - return false; -} else { - try { -ParDoPayload payload = ParDoPayload.parseFrom(parDo.getTransform().getSpec().getPayload()); -if (payload.getStateSpecsCount() > 0 || payload.getTimerSpecsCount() > 0) { - // Inputs to a ParDo that uses State or Timers must be key-partitioned, and elements for - // a key must execute serially. To avoid checking if the rest of the stage is - // key-partitioned and preserves keys, these ParDos do not fuse into an existing stage. - return false; -} - } catch (InvalidProtocolBufferException e) { -throw new IllegalArgumentException(e); +try { + ParDoPayload payload = ParDoPayload.parseFrom(parDo.getTransform().getSpec().getPayload()); + if (Maps.filterKeys( + parDo.getTransform().getInputsMap(), s -> payload.getTimerSpecsMap().containsKey(s)) + .values() + .contains(candidate.getId())) { +// Allow fusion across timer PCollections because they are a self loop. +return true; + } else if (payload.getStateSpecsCount() > 0 || payload.getTimerSpecsCount() > 0) { +// Inputs to a ParDo that uses State or Timers must be key-partitioned, and elements for +// a key must execute serially. To avoid checking if the rest of the stage is +// key-partitioned and preserves keys, these ParDos do not fuse into an existing stage. +return false; + } else if (!pipeline.getSideInputs(parDo).isEmpty()) { +// At execution time, a Runner is required to only provide inputs to a PTransform that, at +// the time the PTransform processes them, the associated window is ready in all side inputs +// that the PTransform consumes. For an arbitrary stage, it is significantly complex for the +// runner to determine this for each input. As a result, we break fusion to simplify this +// inspection. In general, a ParDo which consumes side inputs cannot be fused into an +// executable stage alongside any transforms which are upstream of any of its side inputs. +return false; } +} catch (InvalidProtocolBufferException e) { + throw new IllegalArgumentException(e); } return true; } private static boolean parDoCompatibility( PTransformNode parDo, PTransformNode other, QueryablePipeline pipeline) { -// This is a convenience rather than a strict requirement. In general, a ParDo that consumes -// side inputs can be fused with other transforms in the same environment which are not -// upstream of any of the side inputs. -return pipeline.getSideInputs(parDo).isEmpty() -// Since we lack the ability to mark upstream transforms as key preserving, we -// purposefully break fusion here to provide runners the opportunity to insert a -// grouping operation -&& pipeline.getUserStates(parDo).isEmpty() -&& compatibleEnvironments(parDo, other, pipeline); +// Implicitly true if we are attempting to fuse against oneself. This is for timer PCollection which create a loop. +return parDo.equals(other) +// This is a convenience rather than a strict requirement. In general, a ParDo that consumes +// side inputs can be fused with other transforms in the same environment which are not +// upstream of any of the side inputs. +
[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers
[ https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127498=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127498 ] ASF GitHub Bot logged work on BEAM-4658: Author: ASF GitHub Bot Created on: 25/Jul/18 20:34 Start Date: 25/Jul/18 20:34 Worklog Time Spent: 10m Work Description: lukecwik edited a comment on issue #6050: [BEAM-4658] Update pipeline representation in runner support libraries to handle timers. URL: https://github.com/apache/beam/pull/6050#issuecomment-407569883 R: @tweise @youngoli CC: @charlesccychen @bsidhom This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 127498) Time Spent: 40m (was: 0.5h) > Update pipeline representation in runner support libraries to handle timers > --- > > Key: BEAM-4658 > URL: https://issues.apache.org/jira/browse/BEAM-4658 > Project: Beam > Issue Type: Sub-task > Components: runner-core >Reporter: Luke Cwik >Assignee: Luke Cwik >Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > > Expose relevant timer information on *ExecutableStage* to runners > Update fusion logic to handle timers. > Provide execution time interface to fire timers into *RemoteBundle*s and also > to receive new timers that are being set. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers
[ https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=126928=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-126928 ] ASF GitHub Bot logged work on BEAM-4658: Author: ASF GitHub Bot Created on: 24/Jul/18 22:15 Start Date: 24/Jul/18 22:15 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #6050: [BEAM-4658] Update pipeline representation in runner support libraries to handle timers. URL: https://github.com/apache/beam/pull/6050#issuecomment-407569883 R: @tweise @angoenka CC: @charlesccychen @bsidhom This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 126928) Time Spent: 0.5h (was: 20m) > Update pipeline representation in runner support libraries to handle timers > --- > > Key: BEAM-4658 > URL: https://issues.apache.org/jira/browse/BEAM-4658 > Project: Beam > Issue Type: Sub-task > Components: runner-core >Reporter: Luke Cwik >Assignee: Luke Cwik >Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > > Expose relevant timer information on *ExecutableStage* to runners > Update fusion logic to handle timers. > Provide execution time interface to fire timers into *RemoteBundle*s and also > to receive new timers that are being set. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers
[ https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=126927=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-126927 ] ASF GitHub Bot logged work on BEAM-4658: Author: ASF GitHub Bot Created on: 24/Jul/18 22:14 Start Date: 24/Jul/18 22:14 Worklog Time Spent: 10m Work Description: lukecwik opened a new pull request #6050: [BEAM-4658] Update pipeline representation in runner support libraries to handle timers. URL: https://github.com/apache/beam/pull/6050 Note that I modified several runner libraries related to portability to support multiple inputs. It was painful because of the use of too many abstractions. Hopefully I'll have an opportunity to cut down on how many there are. Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). It will help us expedite review of your Pull Request if you tag someone (e.g. `@username`) to look at it. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | --- | --- | --- | --- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 126927) Time Spent: 20m (was: 10m) > Update pipeline representation in runner support libraries to handle timers > --- > > Key: BEAM-4658 > URL: https://issues.apache.org/jira/browse/BEAM-4658 > Project: Beam > Issue Type: Sub-task > Components:
[jira] [Work logged] (BEAM-4658) Update pipeline representation in runner support libraries to handle timers
[ https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=125253=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-125253 ] ASF GitHub Bot logged work on BEAM-4658: Author: ASF GitHub Bot Created on: 19/Jul/18 21:34 Start Date: 19/Jul/18 21:34 Worklog Time Spent: 10m Work Description: lukecwik opened a new pull request #6002: [BEAM-4658] Modify RemoteBundle interface to allow for multiple inputs. URL: https://github.com/apache/beam/pull/6002 This is towards supporting timers as PCollections. Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). It will help us expedite review of your Pull Request if you tag someone (e.g. `@username`) to look at it. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | --- | --- | --- | --- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 125253) Time Spent: 10m Remaining Estimate: 0h > Update pipeline representation in runner support libraries to handle timers > --- > > Key: BEAM-4658 > URL: https://issues.apache.org/jira/browse/BEAM-4658 > Project: Beam > Issue Type: Sub-task > Components: runner-core >Reporter: Luke Cwik >Assignee: Luke Cwik >Priority: Major > Time Spent: 10m > Remaining Estimate: 0h > >