[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=83709=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83709 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 23/Mar/18 17:35 Start Date: 23/Mar/18 17:35 Worklog Time Spent: 10m Work Description: tgroh closed pull request #4777: [BEAM-3565] Add FusedPipeline#toPipeline URL: https://github.com/apache/beam/pull/4777 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java index c371920d47e..68da5c3961b 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java @@ -19,54 +19,95 @@ package org.apache.beam.runners.core.construction.graph; import com.google.auto.value.AutoValue; +import com.google.common.collect.Sets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.Components; -import org.apache.beam.model.pipeline.v1.RunnerApi.Components.Builder; import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode; -/** - * A {@link Pipeline} which has been separated into collections of executable components. - */ +/** A {@link Pipeline} which has been separated into collections of executable components. */ @AutoValue public abstract class FusedPipeline { static FusedPipeline of( - Set environmentalStages, Set runnerStages) { -return new AutoValue_FusedPipeline(environmentalStages, runnerStages); + Components components, + Set environmentalStages, + Set runnerStages) { +return new AutoValue_FusedPipeline(components, environmentalStages, runnerStages); } - /** - * The {@link ExecutableStage executable stages} that are executed by SDK harnesses. - */ + abstract Components getComponents(); + + /** The {@link ExecutableStage executable stages} that are executed by SDK harnesses. */ public abstract Set getFusedStages(); + /** The {@link PTransform PTransforms} that a runner is responsible for executing. */ + public abstract Set getRunnerExecutedTransforms(); + /** - * The {@link PTransform PTransforms} that a runner is responsible for executing. + * Returns the {@link RunnerApi.Pipeline} representation of this {@link FusedPipeline}. + * + * The {@link Components} of the returned pipeline will contain all of the {@link PTransform + * PTransforms} present in the original Pipeline that this {@link FusedPipeline} was created from, + * plus all of the {@link ExecutableStage ExecutableStages} contained within this {@link + * FusedPipeline}. The {@link Pipeline#getRootTransformIdsList()} will contain all of the runner + * executed transforms and all of the {@link ExecutableStage execuable stages} contained within + * the Pipeline. */ - public abstract Set getRunnerExecutedTransforms(); + public RunnerApi.Pipeline toPipeline() { +MapexecutableStageTransforms = getEnvironmentExecutedTransforms(); +Set executableTransformIds = +Sets.union( +executableStageTransforms.keySet(), +getRunnerExecutedTransforms() +.stream() +.map(PTransformNode::getId) +.collect(Collectors.toSet())); + +// Augment the initial transforms with all of the executable transforms. +Components fusedComponents = + getComponents().toBuilder().putAllTransforms(executableStageTransforms).build(); +List rootTransformIds = +StreamSupport.stream( +QueryablePipeline.forTransforms(executableTransformIds, fusedComponents) +.getTopologicallyOrderedTransforms() +.spliterator(), +false) +.map(PTransformNode::getId) +.collect(Collectors.toList()); +return Pipeline.newBuilder() +.setComponents(fusedComponents) +.addAllRootTransformIds(rootTransformIds) +.build(); + } /** - * Return a
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82996=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82996 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 21/Mar/18 23:46 Start Date: 21/Mar/18 23:46 Worklog Time Spent: 10m Work Description: tgroh commented on issue #4777: [BEAM-3565] Add FusedPipeline#toPipeline URL: https://github.com/apache/beam/pull/4777#issuecomment-375132231 Done to all This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82996) Time Spent: 18h (was: 17h 50m) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 18h > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82987=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82987 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 21/Mar/18 23:18 Start Date: 21/Mar/18 23:18 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #4777: [BEAM-3565] Add FusedPipeline#toPipeline URL: https://github.com/apache/beam/pull/4777#discussion_r176269191 ## File path: runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/FusedPipelineTest.java ## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction.graph; + +import static com.google.common.base.Preconditions.checkState; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasItems; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.startsWith; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableList; +import java.io.Serializable; +import java.util.HashSet; +import java.util.Set; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection; +import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; +import org.apache.beam.runners.core.construction.PipelineTranslation; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.Impulse; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link FusedPipeline}. */ +@RunWith(JUnit4.class) +public class FusedPipelineTest implements Serializable { + @Test + public void testToProto() { +Pipeline p = Pipeline.create(); +p.apply("impulse", Impulse.create()) +.apply("map", MapElements.into(TypeDescriptors.integers()).via(bytes -> bytes.length)) +.apply("key", WithKeys.of("foo")) +.apply("gbk", GroupByKey.create()) +.apply("values", Values.create()); + +RunnerApi.Pipeline protoPipeline = PipelineTranslation.toProto(p); +checkState( +protoPipeline +.getRootTransformIdsList() +.containsAll(ImmutableList.of("impulse", "map", "key", "gbk", "values")), +"Unexpected Root Transform IDs %s", +protoPipeline.getRootTransformIdsList()); + +FusedPipeline fused = GreedyPipelineFuser.fuse(protoPipeline); +checkState( +fused.getRunnerExecutedTransforms().size() == 2, +"Unexpected number of runner transforms %s", +fused.getRunnerExecutedTransforms()); +checkState( +fused.getFusedStages().size() == 2, +"Unexpected number of fused stages %s", +fused.getFusedStages()); +RunnerApi.Pipeline fusedProto = fused.toPipeline(protoPipeline.getComponents()); + +assertThat( +"Root Transforms should all be present in the Pipeline Components", +fusedProto.getComponents().getTransformsMap().keySet(), +hasItems(fusedProto.getRootTransformIdsList().toArray(new String[0]))); +assertThat( +"Should contain Impulse, GroupByKey, and two Environment Stages", +fusedProto.getRootTransformIdsCount(), +equalTo(4)); +assertThat(fusedProto.getRootTransformIdsList(), hasItems("impulse", "gbk")); +assertRootsInTopologicalOrder(fusedProto); +// Since MapElements, WithKeys, and Values are all composites of a ParDo, we do prefix matching +// instead of looking at the inside of their expansions +assertThat( +"Fused transforms should be present in the components", +
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82986=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82986 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 21/Mar/18 23:18 Start Date: 21/Mar/18 23:18 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #4777: [BEAM-3565] Add FusedPipeline#toPipeline URL: https://github.com/apache/beam/pull/4777#discussion_r176267337 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java ## @@ -36,25 +36,45 @@ @AutoValue public abstract class FusedPipeline { static FusedPipeline of( - Set environmentalStages, Set runnerStages) { -return new AutoValue_FusedPipeline(environmentalStages, runnerStages); + Components components, + Set environmentalStages, + Set runnerStages) { +return new AutoValue_FusedPipeline(components, environmentalStages, runnerStages); } + abstract Components getComponents(); + /** The {@link ExecutableStage executable stages} that are executed by SDK harnesses. */ public abstract Set getFusedStages(); /** The {@link PTransform PTransforms} that a runner is responsible for executing. */ public abstract Set getRunnerExecutedTransforms(); - public RunnerApi.Pipeline toPipeline(Components initialComponents) { -MapexecutableTransforms = getExecutableTransforms(initialComponents); -Components fusedComponents = initialComponents.toBuilder() -.putAllTransforms(executableTransforms) -.putAllTransforms(getFusedTransforms()) -.build(); + /** + * Returns the {@link RunnerApi.Pipeline} representation of this {@link FusedPipeline}. + * + * The {@link Components} of the returned pipeline will contain all of the {@link PTransform + * PTransforms} present in the original Pipeline that this {@link FusedPipeline} was created from, + * plus all of the {@link ExecutableStage ExecutableStages} contained within this {@link + * FusedPipeline}. The Root Transform IDs will contain all of the runner executed transforms and Review comment: The upper casing on Root Transform IDs is strange, would you rather link the Pipeline root transform ids method? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82986) Time Spent: 17h 40m (was: 17.5h) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 17h 40m > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82984=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82984 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 21/Mar/18 23:18 Start Date: 21/Mar/18 23:18 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #4777: [BEAM-3565] Add FusedPipeline#toPipeline URL: https://github.com/apache/beam/pull/4777#discussion_r176266684 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java ## @@ -36,25 +36,45 @@ @AutoValue public abstract class FusedPipeline { static FusedPipeline of( - Set environmentalStages, Set runnerStages) { -return new AutoValue_FusedPipeline(environmentalStages, runnerStages); + Components components, + Set environmentalStages, + Set runnerStages) { +return new AutoValue_FusedPipeline(components, environmentalStages, runnerStages); } + abstract Components getComponents(); + /** The {@link ExecutableStage executable stages} that are executed by SDK harnesses. */ public abstract Set getFusedStages(); /** The {@link PTransform PTransforms} that a runner is responsible for executing. */ public abstract Set getRunnerExecutedTransforms(); - public RunnerApi.Pipeline toPipeline(Components initialComponents) { -MapexecutableTransforms = getExecutableTransforms(initialComponents); -Components fusedComponents = initialComponents.toBuilder() -.putAllTransforms(executableTransforms) -.putAllTransforms(getFusedTransforms()) -.build(); + /** + * Returns the {@link RunnerApi.Pipeline} representation of this {@link FusedPipeline}. + * + * The {@link Components} of the returned pipeline will contain all of the {@link PTransform + * PTransforms} present in the original Pipeline that this {@link FusedPipeline} was created from, + * plus all of the {@link ExecutableStage ExecutableStages} contained within this {@link + * FusedPipeline}. The Root Transform IDs will contain all of the runner executed transforms and + * all of the ExecutableStages contained within the Pipeline. Review comment: `ExecutableStages` -> `{@link ExecutableStage executable stages}` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82984) Time Spent: 17h 20m (was: 17h 10m) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 17h 20m > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82985=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82985 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 21/Mar/18 23:18 Start Date: 21/Mar/18 23:18 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #4777: [BEAM-3565] Add FusedPipeline#toPipeline URL: https://github.com/apache/beam/pull/4777#discussion_r176265936 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java ## @@ -19,54 +19,83 @@ package org.apache.beam.runners.core.construction.graph; import com.google.auto.value.AutoValue; +import com.google.common.collect.Sets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.Components; -import org.apache.beam.model.pipeline.v1.RunnerApi.Components.Builder; import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode; -/** - * A {@link Pipeline} which has been separated into collections of executable components. - */ +/** A {@link Pipeline} which has been separated into collections of executable components. */ @AutoValue public abstract class FusedPipeline { static FusedPipeline of( Set environmentalStages, Set runnerStages) { return new AutoValue_FusedPipeline(environmentalStages, runnerStages); } - /** - * The {@link ExecutableStage executable stages} that are executed by SDK harnesses. - */ + /** The {@link ExecutableStage executable stages} that are executed by SDK harnesses. */ public abstract Set getFusedStages(); - /** - * The {@link PTransform PTransforms} that a runner is responsible for executing. - */ + /** The {@link PTransform PTransforms} that a runner is responsible for executing. */ public abstract Set getRunnerExecutedTransforms(); + public RunnerApi.Pipeline toPipeline(Components initialComponents) { +MapexecutableTransforms = getExecutableTransforms(initialComponents); +Components fusedComponents = initialComponents.toBuilder() +.putAllTransforms(executableTransforms) +.putAllTransforms(getFusedTransforms()) +.build(); +List rootTransformIds = +StreamSupport.stream( +QueryablePipeline.forTransforms(executableTransforms.keySet(), fusedComponents) +.getTopologicallyOrderedTransforms() +.spliterator(), +false) +.map(PTransformNode::getId) +.collect(Collectors.toList()); +return Pipeline.newBuilder() +.setComponents(fusedComponents) +.addAllRootTransformIds(rootTransformIds) +.build(); + } + /** - * Return a {@link Components} like the {@code base} components, but with the only transforms - * equal to this fused pipeline. + * Return a {@link Components} like the {@code base} components, but with the set of transforms to Review comment: This comment still seems out of date since it refers to `base` components. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82985) Time Spent: 17.5h (was: 17h 20m) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 17.5h > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82955=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82955 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 21/Mar/18 22:08 Start Date: 21/Mar/18 22:08 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #4777: [BEAM-3565] Add FusedPipeline#toPipeline URL: https://github.com/apache/beam/pull/4777#discussion_r176251543 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java ## @@ -19,54 +19,83 @@ package org.apache.beam.runners.core.construction.graph; import com.google.auto.value.AutoValue; +import com.google.common.collect.Sets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.Components; -import org.apache.beam.model.pipeline.v1.RunnerApi.Components.Builder; import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode; -/** - * A {@link Pipeline} which has been separated into collections of executable components. - */ +/** A {@link Pipeline} which has been separated into collections of executable components. */ @AutoValue public abstract class FusedPipeline { static FusedPipeline of( Set environmentalStages, Set runnerStages) { return new AutoValue_FusedPipeline(environmentalStages, runnerStages); } - /** - * The {@link ExecutableStage executable stages} that are executed by SDK harnesses. - */ + /** The {@link ExecutableStage executable stages} that are executed by SDK harnesses. */ public abstract Set getFusedStages(); - /** - * The {@link PTransform PTransforms} that a runner is responsible for executing. - */ + /** The {@link PTransform PTransforms} that a runner is responsible for executing. */ public abstract Set getRunnerExecutedTransforms(); + public RunnerApi.Pipeline toPipeline(Components initialComponents) { +MapexecutableTransforms = getExecutableTransforms(initialComponents); +Components fusedComponents = initialComponents.toBuilder() +.putAllTransforms(executableTransforms) +.putAllTransforms(getFusedTransforms()) +.build(); +List rootTransformIds = +StreamSupport.stream( +QueryablePipeline.forTransforms(executableTransforms.keySet(), fusedComponents) +.getTopologicallyOrderedTransforms() +.spliterator(), +false) +.map(PTransformNode::getId) +.collect(Collectors.toList()); +return Pipeline.newBuilder() +.setComponents(fusedComponents) +.addAllRootTransformIds(rootTransformIds) +.build(); + } + /** - * Return a {@link Components} like the {@code base} components, but with the only transforms - * equal to this fused pipeline. + * Return a {@link Components} like the {@code base} components, but with the set of transforms to + * be executed by the runner. * - * The only composites will be the stages returned by {@link #getFusedStages()}. + * The transforms that are present in the returned map are the union of the results of {@link + * #getRunnerExecutedTransforms()} and {@link #getFusedStages()}, where each {@link + * ExecutableStage}. Review comment: RM'd This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82955) Time Spent: 16h 50m (was: 16h 40m) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 16h 50m > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82958=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82958 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 21/Mar/18 22:08 Start Date: 21/Mar/18 22:08 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #4777: [BEAM-3565] Add FusedPipeline#toPipeline URL: https://github.com/apache/beam/pull/4777#discussion_r176251681 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java ## @@ -183,6 +195,19 @@ private static boolean isPrimitiveTransform(PTransform transform) { .collect(Collectors.toSet()); } + /** + * Get the PCollections which are not consumed by any {@link PTransformNode} in this {@link + * QueryablePipeline}. + */ + private Set getLeafPCollections() { Review comment: Pulled in accidentally, I think. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82958) Time Spent: 17h 10m (was: 17h) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 17h 10m > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82953=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82953 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 21/Mar/18 22:08 Start Date: 21/Mar/18 22:08 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #4777: [BEAM-3565] Add FusedPipeline#toPipeline URL: https://github.com/apache/beam/pull/4777#discussion_r176251864 ## File path: runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/FusedPipelineTest.java ## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction.graph; + +import static com.google.common.base.Preconditions.checkState; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasItems; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.startsWith; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableList; +import java.io.Serializable; +import java.util.HashSet; +import java.util.Set; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection; +import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; +import org.apache.beam.runners.core.construction.PipelineTranslation; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.Impulse; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link FusedPipeline}. */ +@RunWith(JUnit4.class) +public class FusedPipelineTest implements Serializable { + @Test + public void testToProto() { +Pipeline p = Pipeline.create(); +p.apply("impulse", Impulse.create()) +.apply("map", MapElements.into(TypeDescriptors.integers()).via(bytes -> bytes.length)) +.apply("key", WithKeys.of("foo")) +.apply("gbk", GroupByKey.create()) +.apply("values", Values.create()); + +RunnerApi.Pipeline protoPipeline = PipelineTranslation.toProto(p); +checkState( +protoPipeline +.getRootTransformIdsList() +.containsAll(ImmutableList.of("impulse", "map", "key", "gbk", "values")), +"Unexpected Root Transform IDs %s", +protoPipeline.getRootTransformIdsList()); + +FusedPipeline fused = GreedyPipelineFuser.fuse(protoPipeline); +checkState( +fused.getRunnerExecutedTransforms().size() == 2, +"Unexpected number of runner transforms %s", +fused.getRunnerExecutedTransforms()); +checkState( +fused.getFusedStages().size() == 2, +"Unexpected number of fused stages %s", +fused.getFusedStages()); +RunnerApi.Pipeline fusedProto = fused.toPipeline(protoPipeline.getComponents()); Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82953) Time Spent: 16.5h (was: 16h 20m) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug >
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82956=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82956 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 21/Mar/18 22:08 Start Date: 21/Mar/18 22:08 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #4777: [BEAM-3565] Add FusedPipeline#toPipeline URL: https://github.com/apache/beam/pull/4777#discussion_r176251523 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java ## @@ -19,54 +19,83 @@ package org.apache.beam.runners.core.construction.graph; import com.google.auto.value.AutoValue; +import com.google.common.collect.Sets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.Components; -import org.apache.beam.model.pipeline.v1.RunnerApi.Components.Builder; import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode; -/** - * A {@link Pipeline} which has been separated into collections of executable components. - */ +/** A {@link Pipeline} which has been separated into collections of executable components. */ @AutoValue public abstract class FusedPipeline { static FusedPipeline of( Set environmentalStages, Set runnerStages) { return new AutoValue_FusedPipeline(environmentalStages, runnerStages); } - /** - * The {@link ExecutableStage executable stages} that are executed by SDK harnesses. - */ + /** The {@link ExecutableStage executable stages} that are executed by SDK harnesses. */ public abstract Set getFusedStages(); - /** - * The {@link PTransform PTransforms} that a runner is responsible for executing. - */ + /** The {@link PTransform PTransforms} that a runner is responsible for executing. */ public abstract Set getRunnerExecutedTransforms(); + public RunnerApi.Pipeline toPipeline(Components initialComponents) { +MapexecutableTransforms = getExecutableTransforms(initialComponents); +Components fusedComponents = initialComponents.toBuilder() +.putAllTransforms(executableTransforms) +.putAllTransforms(getFusedTransforms()) +.build(); +List rootTransformIds = +StreamSupport.stream( +QueryablePipeline.forTransforms(executableTransforms.keySet(), fusedComponents) +.getTopologicallyOrderedTransforms() +.spliterator(), +false) +.map(PTransformNode::getId) +.collect(Collectors.toList()); +return Pipeline.newBuilder() +.setComponents(fusedComponents) +.addAllRootTransformIds(rootTransformIds) +.build(); + } + /** - * Return a {@link Components} like the {@code base} components, but with the only transforms - * equal to this fused pipeline. + * Return a {@link Components} like the {@code base} components, but with the set of transforms to Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82956) Time Spent: 17h (was: 16h 50m) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 17h > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82957=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82957 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 21/Mar/18 22:08 Start Date: 21/Mar/18 22:08 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #4777: [BEAM-3565] Add FusedPipeline#toPipeline URL: https://github.com/apache/beam/pull/4777#discussion_r176250895 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java ## @@ -19,54 +19,83 @@ package org.apache.beam.runners.core.construction.graph; import com.google.auto.value.AutoValue; +import com.google.common.collect.Sets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.Components; -import org.apache.beam.model.pipeline.v1.RunnerApi.Components.Builder; import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode; -/** - * A {@link Pipeline} which has been separated into collections of executable components. - */ +/** A {@link Pipeline} which has been separated into collections of executable components. */ @AutoValue public abstract class FusedPipeline { static FusedPipeline of( Set environmentalStages, Set runnerStages) { return new AutoValue_FusedPipeline(environmentalStages, runnerStages); } - /** - * The {@link ExecutableStage executable stages} that are executed by SDK harnesses. - */ + /** The {@link ExecutableStage executable stages} that are executed by SDK harnesses. */ public abstract Set getFusedStages(); - /** - * The {@link PTransform PTransforms} that a runner is responsible for executing. - */ + /** The {@link PTransform PTransforms} that a runner is responsible for executing. */ public abstract Set getRunnerExecutedTransforms(); + public RunnerApi.Pipeline toPipeline(Components initialComponents) { Review comment: Made into a property. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82957) Time Spent: 17h 10m (was: 17h) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 17h 10m > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82954=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82954 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 21/Mar/18 22:08 Start Date: 21/Mar/18 22:08 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #4777: [BEAM-3565] Add FusedPipeline#toPipeline URL: https://github.com/apache/beam/pull/4777#discussion_r176252057 ## File path: runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/FusedPipelineTest.java ## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction.graph; + +import static com.google.common.base.Preconditions.checkState; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasItems; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.startsWith; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableList; +import java.io.Serializable; +import java.util.HashSet; +import java.util.Set; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection; +import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; +import org.apache.beam.runners.core.construction.PipelineTranslation; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.Impulse; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link FusedPipeline}. */ +@RunWith(JUnit4.class) +public class FusedPipelineTest implements Serializable { + @Test + public void testToProto() { +Pipeline p = Pipeline.create(); +p.apply("impulse", Impulse.create()) +.apply("map", MapElements.into(TypeDescriptors.integers()).via(bytes -> bytes.length)) +.apply("key", WithKeys.of("foo")) +.apply("gbk", GroupByKey.create()) +.apply("values", Values.create()); + +RunnerApi.Pipeline protoPipeline = PipelineTranslation.toProto(p); +checkState( +protoPipeline +.getRootTransformIdsList() +.containsAll(ImmutableList.of("impulse", "map", "key", "gbk", "values")), +"Unexpected Root Transform IDs %s", +protoPipeline.getRootTransformIdsList()); + +FusedPipeline fused = GreedyPipelineFuser.fuse(protoPipeline); +checkState( +fused.getRunnerExecutedTransforms().size() == 2, +"Unexpected number of runner transforms %s", +fused.getRunnerExecutedTransforms()); +checkState( +fused.getFusedStages().size() == 2, +"Unexpected number of fused stages %s", +fused.getFusedStages()); +RunnerApi.Pipeline fusedProto = fused.toPipeline(protoPipeline.getComponents()); + +assertThat( +"Root Transforms should all be present in the Pipeline Components", +fusedProto.getComponents().getTransformsMap().keySet(), +hasItems(fusedProto.getRootTransformIdsList().toArray(new String[0]))); +assertThat( +"Should contain Impulse, GroupByKey, and two Environment Stages", +fusedProto.getRootTransformIdsCount(), +equalTo(4)); +assertThat(fusedProto.getRootTransformIdsList(), hasItems("impulse", "gbk")); +assertRootsInTopologicalOrder(fusedProto); +// Since MapElements, WithKeys, and Values are all composites of a ParDo, we do prefix matching +// instead of looking at the inside of their expansions +assertThat( +"Fused transforms should be present in the components", +
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82934=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82934 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 21/Mar/18 21:41 Start Date: 21/Mar/18 21:41 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #4777: [BEAM-3565] Add FusedPipeline#toPipeline URL: https://github.com/apache/beam/pull/4777#discussion_r176240754 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java ## @@ -19,54 +19,83 @@ package org.apache.beam.runners.core.construction.graph; import com.google.auto.value.AutoValue; +import com.google.common.collect.Sets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.Components; -import org.apache.beam.model.pipeline.v1.RunnerApi.Components.Builder; import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode; -/** - * A {@link Pipeline} which has been separated into collections of executable components. - */ +/** A {@link Pipeline} which has been separated into collections of executable components. */ @AutoValue public abstract class FusedPipeline { static FusedPipeline of( Set environmentalStages, Set runnerStages) { return new AutoValue_FusedPipeline(environmentalStages, runnerStages); } - /** - * The {@link ExecutableStage executable stages} that are executed by SDK harnesses. - */ + /** The {@link ExecutableStage executable stages} that are executed by SDK harnesses. */ public abstract Set getFusedStages(); - /** - * The {@link PTransform PTransforms} that a runner is responsible for executing. - */ + /** The {@link PTransform PTransforms} that a runner is responsible for executing. */ public abstract Set getRunnerExecutedTransforms(); + public RunnerApi.Pipeline toPipeline(Components initialComponents) { Review comment: We were given the Pipeline when we constructed the FusedStage via GreedyPipelineFuser, why do we need `initialComponents` to be passed in again here? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82934) Time Spent: 16h 10m (was: 16h) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 16h 10m > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82932=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82932 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 21/Mar/18 21:41 Start Date: 21/Mar/18 21:41 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #4777: [BEAM-3565] Add FusedPipeline#toPipeline URL: https://github.com/apache/beam/pull/4777#discussion_r176237015 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java ## @@ -19,54 +19,83 @@ package org.apache.beam.runners.core.construction.graph; import com.google.auto.value.AutoValue; +import com.google.common.collect.Sets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.Components; -import org.apache.beam.model.pipeline.v1.RunnerApi.Components.Builder; import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode; -/** - * A {@link Pipeline} which has been separated into collections of executable components. - */ +/** A {@link Pipeline} which has been separated into collections of executable components. */ @AutoValue public abstract class FusedPipeline { static FusedPipeline of( Set environmentalStages, Set runnerStages) { return new AutoValue_FusedPipeline(environmentalStages, runnerStages); } - /** - * The {@link ExecutableStage executable stages} that are executed by SDK harnesses. - */ + /** The {@link ExecutableStage executable stages} that are executed by SDK harnesses. */ public abstract Set getFusedStages(); - /** - * The {@link PTransform PTransforms} that a runner is responsible for executing. - */ + /** The {@link PTransform PTransforms} that a runner is responsible for executing. */ public abstract Set getRunnerExecutedTransforms(); + public RunnerApi.Pipeline toPipeline(Components initialComponents) { +MapexecutableTransforms = getExecutableTransforms(initialComponents); +Components fusedComponents = initialComponents.toBuilder() +.putAllTransforms(executableTransforms) +.putAllTransforms(getFusedTransforms()) +.build(); +List rootTransformIds = +StreamSupport.stream( +QueryablePipeline.forTransforms(executableTransforms.keySet(), fusedComponents) +.getTopologicallyOrderedTransforms() +.spliterator(), +false) +.map(PTransformNode::getId) +.collect(Collectors.toList()); +return Pipeline.newBuilder() +.setComponents(fusedComponents) +.addAllRootTransformIds(rootTransformIds) +.build(); + } + /** - * Return a {@link Components} like the {@code base} components, but with the only transforms - * equal to this fused pipeline. + * Return a {@link Components} like the {@code base} components, but with the set of transforms to + * be executed by the runner. * - * The only composites will be the stages returned by {@link #getFusedStages()}. + * The transforms that are present in the returned map are the union of the results of {@link + * #getRunnerExecutedTransforms()} and {@link #getFusedStages()}, where each {@link + * ExecutableStage}. Review comment: where each executable stage? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82932) Time Spent: 15h 50m (was: 15h 40m) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 15h 50m > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82933=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82933 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 21/Mar/18 21:41 Start Date: 21/Mar/18 21:41 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #4777: [BEAM-3565] Add FusedPipeline#toPipeline URL: https://github.com/apache/beam/pull/4777#discussion_r176246992 ## File path: runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/FusedPipelineTest.java ## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction.graph; + +import static com.google.common.base.Preconditions.checkState; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasItems; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.startsWith; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableList; +import java.io.Serializable; +import java.util.HashSet; +import java.util.Set; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection; +import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; +import org.apache.beam.runners.core.construction.PipelineTranslation; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.Impulse; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link FusedPipeline}. */ +@RunWith(JUnit4.class) +public class FusedPipelineTest implements Serializable { + @Test + public void testToProto() { +Pipeline p = Pipeline.create(); +p.apply("impulse", Impulse.create()) +.apply("map", MapElements.into(TypeDescriptors.integers()).via(bytes -> bytes.length)) +.apply("key", WithKeys.of("foo")) +.apply("gbk", GroupByKey.create()) +.apply("values", Values.create()); + +RunnerApi.Pipeline protoPipeline = PipelineTranslation.toProto(p); +checkState( +protoPipeline +.getRootTransformIdsList() +.containsAll(ImmutableList.of("impulse", "map", "key", "gbk", "values")), +"Unexpected Root Transform IDs %s", +protoPipeline.getRootTransformIdsList()); + +FusedPipeline fused = GreedyPipelineFuser.fuse(protoPipeline); +checkState( +fused.getRunnerExecutedTransforms().size() == 2, +"Unexpected number of runner transforms %s", +fused.getRunnerExecutedTransforms()); +checkState( +fused.getFusedStages().size() == 2, +"Unexpected number of fused stages %s", +fused.getFusedStages()); +RunnerApi.Pipeline fusedProto = fused.toPipeline(protoPipeline.getComponents()); + +assertThat( +"Root Transforms should all be present in the Pipeline Components", +fusedProto.getComponents().getTransformsMap().keySet(), +hasItems(fusedProto.getRootTransformIdsList().toArray(new String[0]))); +assertThat( +"Should contain Impulse, GroupByKey, and two Environment Stages", +fusedProto.getRootTransformIdsCount(), +equalTo(4)); +assertThat(fusedProto.getRootTransformIdsList(), hasItems("impulse", "gbk")); +assertRootsInTopologicalOrder(fusedProto); +// Since MapElements, WithKeys, and Values are all composites of a ParDo, we do prefix matching +// instead of looking at the inside of their expansions +assertThat( +"Fused transforms should be present in the components", +
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82937=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82937 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 21/Mar/18 21:41 Start Date: 21/Mar/18 21:41 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #4777: [BEAM-3565] Add FusedPipeline#toPipeline URL: https://github.com/apache/beam/pull/4777#discussion_r176244335 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java ## @@ -183,6 +195,19 @@ private static boolean isPrimitiveTransform(PTransform transform) { .collect(Collectors.toSet()); } + /** + * Get the PCollections which are not consumed by any {@link PTransformNode} in this {@link + * QueryablePipeline}. + */ + private Set getLeafPCollections() { Review comment: This isn't used anywhere, what is the intent of adding it right now? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82937) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 16h 20m > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82935=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82935 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 21/Mar/18 21:41 Start Date: 21/Mar/18 21:41 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #4777: [BEAM-3565] Add FusedPipeline#toPipeline URL: https://github.com/apache/beam/pull/4777#discussion_r176245384 ## File path: runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/FusedPipelineTest.java ## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction.graph; + +import static com.google.common.base.Preconditions.checkState; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasItems; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.startsWith; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableList; +import java.io.Serializable; +import java.util.HashSet; +import java.util.Set; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection; +import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; +import org.apache.beam.runners.core.construction.PipelineTranslation; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.Impulse; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link FusedPipeline}. */ +@RunWith(JUnit4.class) +public class FusedPipelineTest implements Serializable { + @Test + public void testToProto() { +Pipeline p = Pipeline.create(); +p.apply("impulse", Impulse.create()) +.apply("map", MapElements.into(TypeDescriptors.integers()).via(bytes -> bytes.length)) +.apply("key", WithKeys.of("foo")) +.apply("gbk", GroupByKey.create()) +.apply("values", Values.create()); + +RunnerApi.Pipeline protoPipeline = PipelineTranslation.toProto(p); +checkState( +protoPipeline +.getRootTransformIdsList() +.containsAll(ImmutableList.of("impulse", "map", "key", "gbk", "values")), +"Unexpected Root Transform IDs %s", +protoPipeline.getRootTransformIdsList()); + +FusedPipeline fused = GreedyPipelineFuser.fuse(protoPipeline); +checkState( +fused.getRunnerExecutedTransforms().size() == 2, +"Unexpected number of runner transforms %s", +fused.getRunnerExecutedTransforms()); +checkState( +fused.getFusedStages().size() == 2, +"Unexpected number of fused stages %s", +fused.getFusedStages()); +RunnerApi.Pipeline fusedProto = fused.toPipeline(protoPipeline.getComponents()); Review comment: `fusedProto` -> `fusedProtoPipeline`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82935) Time Spent: 16h 10m (was: 16h) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam >
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82936=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82936 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 21/Mar/18 21:41 Start Date: 21/Mar/18 21:41 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #4777: [BEAM-3565] Add FusedPipeline#toPipeline URL: https://github.com/apache/beam/pull/4777#discussion_r176238126 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java ## @@ -19,54 +19,83 @@ package org.apache.beam.runners.core.construction.graph; import com.google.auto.value.AutoValue; +import com.google.common.collect.Sets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.Components; -import org.apache.beam.model.pipeline.v1.RunnerApi.Components.Builder; import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode; -/** - * A {@link Pipeline} which has been separated into collections of executable components. - */ +/** A {@link Pipeline} which has been separated into collections of executable components. */ @AutoValue public abstract class FusedPipeline { static FusedPipeline of( Set environmentalStages, Set runnerStages) { return new AutoValue_FusedPipeline(environmentalStages, runnerStages); } - /** - * The {@link ExecutableStage executable stages} that are executed by SDK harnesses. - */ + /** The {@link ExecutableStage executable stages} that are executed by SDK harnesses. */ public abstract Set getFusedStages(); - /** - * The {@link PTransform PTransforms} that a runner is responsible for executing. - */ + /** The {@link PTransform PTransforms} that a runner is responsible for executing. */ public abstract Set getRunnerExecutedTransforms(); + public RunnerApi.Pipeline toPipeline(Components initialComponents) { +MapexecutableTransforms = getExecutableTransforms(initialComponents); +Components fusedComponents = initialComponents.toBuilder() +.putAllTransforms(executableTransforms) +.putAllTransforms(getFusedTransforms()) +.build(); +List rootTransformIds = +StreamSupport.stream( +QueryablePipeline.forTransforms(executableTransforms.keySet(), fusedComponents) +.getTopologicallyOrderedTransforms() +.spliterator(), +false) +.map(PTransformNode::getId) +.collect(Collectors.toList()); +return Pipeline.newBuilder() +.setComponents(fusedComponents) +.addAllRootTransformIds(rootTransformIds) +.build(); + } + /** - * Return a {@link Components} like the {@code base} components, but with the only transforms - * equal to this fused pipeline. + * Return a {@link Components} like the {@code base} components, but with the set of transforms to Review comment: This comment seems like it should apply toPipeline and not the private method getExecutableStages Also, Return a {@link Pipeline} This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82936) Time Spent: 16h 20m (was: 16h 10m) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 16h 20m > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82551=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82551 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 21/Mar/18 00:01 Start Date: 21/Mar/18 00:01 Worklog Time Spent: 10m Work Description: tgroh commented on issue #4777: [BEAM-3565] Add FusedPipeline#toPipeline URL: https://github.com/apache/beam/pull/4777#issuecomment-374797898 PTAL This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82551) Time Spent: 15h 40m (was: 15.5h) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 15h 40m > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82485=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82485 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 20/Mar/18 21:24 Start Date: 20/Mar/18 21:24 Worklog Time Spent: 10m Work Description: tgroh closed pull request #4898: [BEAM-3565] Clean up ExecutableStage URL: https://github.com/apache/beam/pull/4898 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto index 3ed90dd036e..9fa301460fb 100644 --- a/model/pipeline/src/main/proto/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/beam_runner_api.proto @@ -207,6 +207,8 @@ message PCollection { // ProcessBundleDescriptor. message ExecutableStagePayload { + // Environment in which this stage executes. We use an environment rather than environment id + // because ExecutableStages use environments directly. This may change in the future. Environment environment = 1; // Input PCollection id. diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java index 27bfed87553..e66148421fc 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java @@ -81,53 +81,53 @@ * follows: * * - * The {@link PTransform#getSubtransformsList()} contains no subtransforms. This ensures + * The {@link PTransform#getSubtransformsList()} is empty. This ensures * that executable stages are treated as primitive transforms. * The only {@link PCollection} in the {@link PTransform#getInputsMap()} is the result of * {@link #getInputPCollection()}. * The output {@link PCollection PCollections} in the values of {@link * PTransform#getOutputsMap()} are the {@link PCollectionNode PCollections} returned by * {@link #getOutputPCollections()}. - * The {@link FunctionSpec} contains an {@link ExecutableStagePayload} which has its input - * and output PCollections set to the same values as the outer PTransform itself. It further - * contains the environment set of transforms for this stage. + * The {@link PTransform#getSpec()} contains an {@link ExecutableStagePayload} with inputs + * and outputs equal to the PTransform's inputs and outputs, and transforms equal to the + * result of {@link #getTransforms}. * * * The executable stage can be reconstructed from the resulting {@link ExecutableStagePayload} * and components alone via {@link #fromPayload(ExecutableStagePayload, Components)}. */ default PTransform toPTransform() { +PTransform.Builder pt = PTransform.newBuilder(); ExecutableStagePayload.Builder payload = ExecutableStagePayload.newBuilder(); payload.setEnvironment(getEnvironment()); +// Populate inputs and outputs of the stage payload and outer PTransform simultaneously. PCollectionNode input = getInputPCollection(); +pt.putInputs("input", getInputPCollection().getId()); payload.setInput(input.getId()); -for (PTransformNode transform : getTransforms()) { - payload.addTransforms(transform.getId()); -} - +int outputIndex = 0; for (PCollectionNode output : getOutputPCollections()) { + pt.putOutputs(String.format("materialized_%d", outputIndex), output.getId()); payload.addOutputs(output.getId()); + outputIndex++; +} + +// Inner PTransforms of this stage are hidden from the outer pipeline and only belong in the +// stage payload. +for (PTransformNode transform : getTransforms()) { + payload.addTransforms(transform.getId()); } -PTransform.Builder pt = PTransform.newBuilder(); pt.setSpec(FunctionSpec.newBuilder() .setUrn(ExecutableStage.URN) .setPayload(payload.build().toByteString()) .build()); -pt.putInputs("input", getInputPCollection().getId()); -int outputIndex = 0; -for (PCollectionNode pcNode : getOutputPCollections()) { - // Do something - pt.putOutputs(String.format("materialized_%d", outputIndex), pcNode.getId()); - outputIndex++; -} + return pt.build(); } - // TODO: Should this live under
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82440=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82440 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 20/Mar/18 19:53 Start Date: 20/Mar/18 19:53 Worklog Time Spent: 10m Work Description: tgroh commented on issue #4898: [BEAM-3565] Clean up ExecutableStage URL: https://github.com/apache/beam/pull/4898#issuecomment-374734491 run gradle java precommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82440) Time Spent: 15h (was: 14h 50m) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 15h > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82441=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82441 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 20/Mar/18 19:53 Start Date: 20/Mar/18 19:53 Worklog Time Spent: 10m Work Description: tgroh commented on issue #4898: [BEAM-3565] Clean up ExecutableStage URL: https://github.com/apache/beam/pull/4898#issuecomment-374734566 run java gradle precommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82441) Time Spent: 15h 10m (was: 15h) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 15h 10m > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82439=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82439 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 20/Mar/18 19:53 Start Date: 20/Mar/18 19:53 Worklog Time Spent: 10m Work Description: tgroh commented on issue #4898: [BEAM-3565] Clean up ExecutableStage URL: https://github.com/apache/beam/pull/4898#issuecomment-374734491 run gradle java precommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82439) Time Spent: 14h 50m (was: 14h 40m) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 14h 50m > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82442=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82442 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 20/Mar/18 19:53 Start Date: 20/Mar/18 19:53 Worklog Time Spent: 10m Work Description: tgroh commented on issue #4898: [BEAM-3565] Clean up ExecutableStage URL: https://github.com/apache/beam/pull/4898#issuecomment-374734566 run java gradle precommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82442) Time Spent: 15h 20m (was: 15h 10m) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 15h 20m > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82385=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82385 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 20/Mar/18 18:07 Start Date: 20/Mar/18 18:07 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #4898: [BEAM-3565] Clean up ExecutableStage URL: https://github.com/apache/beam/pull/4898#discussion_r175870104 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java ## @@ -81,53 +81,53 @@ * follows: * * - * The {@link PTransform#getSubtransformsList()} contains no subtransforms. This ensures + * The {@link PTransform#getSubtransformsList()} is empty. This ensures * that executable stages are treated as primitive transforms. * The only {@link PCollection} in the {@link PTransform#getInputsMap()} is the result of * {@link #getInputPCollection()}. * The output {@link PCollection PCollections} in the values of {@link * PTransform#getOutputsMap()} are the {@link PCollectionNode PCollections} returned by * {@link #getOutputPCollections()}. - * The {@link FunctionSpec} contains an {@link ExecutableStagePayload} which has its input - * and output PCollections set to the same values as the outer PTransform itself. It further - * contains the environment set of transforms for this stage. + * The {@link PTransform#getSpec()} contains an {@link ExecutableStagePayload} which has its + * input and output PCollections set to the same values as the outer PTransform itself. It + * further contains the environment set of transforms for this stage. Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82385) Time Spent: 14h 40m (was: 14.5h) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 14h 40m > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82381=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82381 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 20/Mar/18 18:02 Start Date: 20/Mar/18 18:02 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #4777: [BEAM-3565] Add FusedPipeline#toPipeline URL: https://github.com/apache/beam/pull/4777#discussion_r175868383 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java ## @@ -19,54 +19,84 @@ package org.apache.beam.runners.core.construction.graph; import com.google.auto.value.AutoValue; +import com.google.common.collect.Sets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.Components; -import org.apache.beam.model.pipeline.v1.RunnerApi.Components.Builder; import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode; -/** - * A {@link Pipeline} which has been separated into collections of executable components. - */ +/** A {@link Pipeline} which has been separated into collections of executable components. */ @AutoValue public abstract class FusedPipeline { static FusedPipeline of( Set environmentalStages, Set runnerStages) { return new AutoValue_FusedPipeline(environmentalStages, runnerStages); } - /** - * The {@link ExecutableStage executable stages} that are executed by SDK harnesses. - */ + /** The {@link ExecutableStage executable stages} that are executed by SDK harnesses. */ public abstract Set getFusedStages(); - /** - * The {@link PTransform PTransforms} that a runner is responsible for executing. - */ + /** The {@link PTransform PTransforms} that a runner is responsible for executing. */ public abstract Set getRunnerExecutedTransforms(); + public RunnerApi.Pipeline toPipeline(Components initialComponents) { +Components executableComponents = +initialComponents +.toBuilder() +.clearTransforms() +.putAllTransforms(getTopLevelTransforms(initialComponents)) +.build(); +List rootTransformIds = +StreamSupport.stream( +QueryablePipeline.forComponents(executableComponents) +.getTopologicallyOrderedTransforms() +.spliterator(), +false) +.map(PTransformNode::getId) +.collect(Collectors.toList()); +return Pipeline.newBuilder() + .setComponents(executableComponents.toBuilder().putAllTransforms(getFusedTransforms())) +.addAllRootTransformIds(rootTransformIds) +.build(); + } + /** * Return a {@link Components} like the {@code base} components, but with the only transforms * equal to this fused pipeline. Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82381) Time Spent: 14.5h (was: 14h 20m) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 14.5h > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82380=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82380 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 20/Mar/18 18:01 Start Date: 20/Mar/18 18:01 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #4777: [BEAM-3565] Add FusedPipeline#toPipeline URL: https://github.com/apache/beam/pull/4777#discussion_r175868128 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java ## @@ -19,54 +19,84 @@ package org.apache.beam.runners.core.construction.graph; import com.google.auto.value.AutoValue; +import com.google.common.collect.Sets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.Components; -import org.apache.beam.model.pipeline.v1.RunnerApi.Components.Builder; import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode; -/** - * A {@link Pipeline} which has been separated into collections of executable components. - */ +/** A {@link Pipeline} which has been separated into collections of executable components. */ @AutoValue public abstract class FusedPipeline { static FusedPipeline of( Set environmentalStages, Set runnerStages) { return new AutoValue_FusedPipeline(environmentalStages, runnerStages); } - /** - * The {@link ExecutableStage executable stages} that are executed by SDK harnesses. - */ + /** The {@link ExecutableStage executable stages} that are executed by SDK harnesses. */ public abstract Set getFusedStages(); - /** - * The {@link PTransform PTransforms} that a runner is responsible for executing. - */ + /** The {@link PTransform PTransforms} that a runner is responsible for executing. */ public abstract Set getRunnerExecutedTransforms(); + public RunnerApi.Pipeline toPipeline(Components initialComponents) { +Components executableComponents = +initialComponents +.toBuilder() +.clearTransforms() +.putAllTransforms(getTopLevelTransforms(initialComponents)) +.build(); +List rootTransformIds = +StreamSupport.stream( +QueryablePipeline.forComponents(executableComponents) +.getTopologicallyOrderedTransforms() +.spliterator(), +false) +.map(PTransformNode::getId) +.collect(Collectors.toList()); +return Pipeline.newBuilder() + .setComponents(executableComponents.toBuilder().putAllTransforms(getFusedTransforms())) +.addAllRootTransformIds(rootTransformIds) +.build(); + } + /** * Return a {@link Components} like the {@code base} components, but with the only transforms * equal to this fused pipeline. * - * The only composites will be the stages returned by {@link #getFusedStages()}. + * The only composites will be the stages returned by {@link #getFusedStages()}, and the only + * primitives will be the {@link PTransformNode transforms} returned by {@link + * #getRunnerExecutedTransforms()}. */ - public Components asComponents(Components base) { -Builder newComponents = base.toBuilder().clearTransforms(); + private MapgetTopLevelTransforms(Components base) { Review comment: `getExecutableTransforms` is what I am calling this now. All of the transforms are primitives, but there are non-executable transforms (within a stage) that get merged into the components as well. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82380) Time Spent: 14h 20m (was: 14h 10m) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82372=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82372 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 20/Mar/18 17:59 Start Date: 20/Mar/18 17:59 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #4898: [BEAM-3565] Clean up ExecutableStage URL: https://github.com/apache/beam/pull/4898#discussion_r175867418 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java ## @@ -81,53 +81,53 @@ * follows: * * - * The {@link PTransform#getSubtransformsList()} contains no subtransforms. This ensures + * The {@link PTransform#getSubtransformsList()} is empty. This ensures * that executable stages are treated as primitive transforms. * The only {@link PCollection} in the {@link PTransform#getInputsMap()} is the result of * {@link #getInputPCollection()}. * The output {@link PCollection PCollections} in the values of {@link * PTransform#getOutputsMap()} are the {@link PCollectionNode PCollections} returned by * {@link #getOutputPCollections()}. - * The {@link FunctionSpec} contains an {@link ExecutableStagePayload} which has its input - * and output PCollections set to the same values as the outer PTransform itself. It further - * contains the environment set of transforms for this stage. + * The {@link PTransform#getSpec()} contains an {@link ExecutableStagePayload} which has its + * input and output PCollections set to the same values as the outer PTransform itself. It + * further contains the environment set of transforms for this stage. Review comment: "The {@link PTransform#getSpec()} contains an {@link ExecutableStagePayload} with inputs and outputs equal to the PTransform's inputs and outputs, and transforms equal to the result of {@link #getTransforms}" This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82372) Time Spent: 14h 10m (was: 14h) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 14h 10m > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82364=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82364 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 20/Mar/18 17:40 Start Date: 20/Mar/18 17:40 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #4777: [BEAM-3565] Add FusedPipeline#toPipeline URL: https://github.com/apache/beam/pull/4777#discussion_r175238617 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java ## @@ -161,6 +179,40 @@ private static boolean isPrimitiveTransform(PTransform transform) { return network; } + // This enables a naive implementation of topological sort, instead of doing something clever. + // Nodes with lower weight precede nodes with higher weight, and are unrelated to nodes with + // equal weight + private final LoadingCachenodeWeights = + CacheBuilder.newBuilder() + .build( + new CacheLoader () { +@Override +public Long load(@Nonnull PTransformNode transformNode) { + long parentWeight = 0L; + for (String inputPCollectionId : + transformNode.getTransform().getInputsMap().values()) { +PTransformNode upstream = +getProducer( +PipelineNode.pCollection( +inputPCollectionId, + components.getPcollectionsOrThrow(inputPCollectionId))); +parentWeight += nodeWeights.getUnchecked(upstream); + } + return 1 + parentWeight; +} + }); + + public Iterable getTopologicallyOrderedTransforms() { +return pipelineNetwork +.nodes() +.stream() +.filter(node -> node instanceof PTransformNode) +.map(PTransformNode.class::cast) +.collect( +Collectors.toCollection( +() -> new TreeSet<>(Comparator.comparingLong(nodeWeights::getUnchecked; Review comment: This comparator is really extremely inconsistent with equals, and will make it so there only looks like there's one root transform. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82364) Time Spent: 14h (was: 13h 50m) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 14h > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82075=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82075 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 19/Mar/18 22:41 Start Date: 19/Mar/18 22:41 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #4844: [BEAM-3565] Add ExecutableStagePayload to simplify runner stage reconstruction URL: https://github.com/apache/beam/pull/4844#discussion_r175603054 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java ## @@ -84,64 +81,72 @@ * follows: * * - * The {@link PTransform#getSubtransformsList()} contains the result of {@link - * #getTransforms()}. + * The {@link PTransform#getSubtransformsList()} contains no subtransforms. This ensures + * that executable stages are treated as primitive transforms. * The only {@link PCollection} in the {@link PTransform#getInputsMap()} is the result of * {@link #getInputPCollection()}. * The output {@link PCollection PCollections} in the values of {@link * PTransform#getOutputsMap()} are the {@link PCollectionNode PCollections} returned by * {@link #getOutputPCollections()}. + * The {@link FunctionSpec} contains an {@link ExecutableStagePayload} which has its input Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82075) Time Spent: 13.5h (was: 13h 20m) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 13.5h > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82079=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82079 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 19/Mar/18 22:41 Start Date: 19/Mar/18 22:41 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #4844: [BEAM-3565] Add ExecutableStagePayload to simplify runner stage reconstruction URL: https://github.com/apache/beam/pull/4844#discussion_r175595281 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java ## @@ -84,64 +81,72 @@ * follows: * * - * The {@link PTransform#getSubtransformsList()} contains the result of {@link - * #getTransforms()}. + * The {@link PTransform#getSubtransformsList()} contains no subtransforms. This ensures + * that executable stages are treated as primitive transforms. * The only {@link PCollection} in the {@link PTransform#getInputsMap()} is the result of * {@link #getInputPCollection()}. * The output {@link PCollection PCollections} in the values of {@link * PTransform#getOutputsMap()} are the {@link PCollectionNode PCollections} returned by * {@link #getOutputPCollections()}. + * The {@link FunctionSpec} contains an {@link ExecutableStagePayload} which has its input + * and output PCollections set to the same values as the outer PTransform itself. It further + * contains the environment set of transforms for this stage. * + * + * The executable stage can be reconstructed from the resulting {@link ExecutableStagePayload} + * and components alone via {@link #fromPayload(ExecutableStagePayload, Components)}. */ default PTransform toPTransform() { +ExecutableStagePayload.Builder payload = ExecutableStagePayload.newBuilder(); + +payload.setEnvironment(getEnvironment()); + +PCollectionNode input = getInputPCollection(); +payload.setInput(input.getId()); + +for (PTransformNode transform : getTransforms()) { + payload.addTransforms(transform.getId()); +} + +for (PCollectionNode output : getOutputPCollections()) { + payload.addOutputs(output.getId()); +} + PTransform.Builder pt = PTransform.newBuilder(); +pt.setSpec(FunctionSpec.newBuilder() +.setUrn(ExecutableStage.URN) +.setPayload(payload.build().toByteString()) +.build()); pt.putInputs("input", getInputPCollection().getId()); -int i = 0; -for (PCollectionNode materializedPCollection : getOutputPCollections()) { - pt.putOutputs(String.format("materialized_%s", i), materializedPCollection.getId()); - i++; -} -for (PTransformNode fusedTransform : getTransforms()) { - pt.addSubtransforms(fusedTransform.getId()); +int outputIndex = 0; +for (PCollectionNode pcNode : getOutputPCollections()) { + // Do something Review comment: Meaningless. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82079) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 13h 40m > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82077=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82077 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 19/Mar/18 22:41 Start Date: 19/Mar/18 22:41 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #4844: [BEAM-3565] Add ExecutableStagePayload to simplify runner stage reconstruction URL: https://github.com/apache/beam/pull/4844#discussion_r175594386 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java ## @@ -84,64 +81,72 @@ * follows: * * - * The {@link PTransform#getSubtransformsList()} contains the result of {@link - * #getTransforms()}. + * The {@link PTransform#getSubtransformsList()} contains no subtransforms. This ensures + * that executable stages are treated as primitive transforms. * The only {@link PCollection} in the {@link PTransform#getInputsMap()} is the result of * {@link #getInputPCollection()}. * The output {@link PCollection PCollections} in the values of {@link * PTransform#getOutputsMap()} are the {@link PCollectionNode PCollections} returned by * {@link #getOutputPCollections()}. + * The {@link FunctionSpec} contains an {@link ExecutableStagePayload} which has its input + * and output PCollections set to the same values as the outer PTransform itself. It further + * contains the environment set of transforms for this stage. * + * + * The executable stage can be reconstructed from the resulting {@link ExecutableStagePayload} + * and components alone via {@link #fromPayload(ExecutableStagePayload, Components)}. */ default PTransform toPTransform() { +ExecutableStagePayload.Builder payload = ExecutableStagePayload.newBuilder(); + +payload.setEnvironment(getEnvironment()); + +PCollectionNode input = getInputPCollection(); +payload.setInput(input.getId()); + +for (PTransformNode transform : getTransforms()) { + payload.addTransforms(transform.getId()); +} + +for (PCollectionNode output : getOutputPCollections()) { + payload.addOutputs(output.getId()); +} + PTransform.Builder pt = PTransform.newBuilder(); +pt.setSpec(FunctionSpec.newBuilder() +.setUrn(ExecutableStage.URN) +.setPayload(payload.build().toByteString()) +.build()); pt.putInputs("input", getInputPCollection().getId()); -int i = 0; -for (PCollectionNode materializedPCollection : getOutputPCollections()) { - pt.putOutputs(String.format("materialized_%s", i), materializedPCollection.getId()); - i++; -} -for (PTransformNode fusedTransform : getTransforms()) { - pt.addSubtransforms(fusedTransform.getId()); +int outputIndex = 0; +for (PCollectionNode pcNode : getOutputPCollections()) { + // Do something + pt.putOutputs(String.format("materialized_%d", outputIndex), pcNode.getId()); + outputIndex++; } -pt.setSpec(FunctionSpec.newBuilder().setUrn(ExecutableStage.URN)); return pt.build(); } + // TODO: Should this live under ExecutableStageTranslation? Review comment: Ack. I meant this to be a question to the reviewer rather than an actual checked-in TODO. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82077) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 13.5h > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82080=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82080 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 19/Mar/18 22:41 Start Date: 19/Mar/18 22:41 Worklog Time Spent: 10m Work Description: bsidhom commented on issue #4898: [BEAM-3565] Clean up ExecutableStage URL: https://github.com/apache/beam/pull/4898#issuecomment-374408951 R: @tgroh This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82080) Time Spent: 13h 50m (was: 13h 40m) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 13h 50m > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82078=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82078 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 19/Mar/18 22:41 Start Date: 19/Mar/18 22:41 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #4844: [BEAM-3565] Add ExecutableStagePayload to simplify runner stage reconstruction URL: https://github.com/apache/beam/pull/4844#discussion_r175602123 ## File path: model/pipeline/src/main/proto/beam_runner_api.proto ## @@ -203,6 +203,23 @@ message PCollection { DisplayData display_data = 5; } +// The payload for an executable stage. This will eventually be passed to an SDK in the form of a +// ProcessBundleDescriptor. +message ExecutableStagePayload { + + Environment environment = 1; Review comment: Added a comment. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82078) Time Spent: 13h 40m (was: 13.5h) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 13h 40m > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82076=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82076 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 19/Mar/18 22:41 Start Date: 19/Mar/18 22:41 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #4844: [BEAM-3565] Add ExecutableStagePayload to simplify runner stage reconstruction URL: https://github.com/apache/beam/pull/4844#discussion_r175602637 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java ## @@ -84,64 +81,72 @@ * follows: * * - * The {@link PTransform#getSubtransformsList()} contains the result of {@link - * #getTransforms()}. + * The {@link PTransform#getSubtransformsList()} contains no subtransforms. This ensures Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82076) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 13.5h > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82074=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82074 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 19/Mar/18 22:40 Start Date: 19/Mar/18 22:40 Worklog Time Spent: 10m Work Description: bsidhom opened a new pull request #4898: [BEAM-3565] Clean up ExecutableStage URL: https://github.com/apache/beam/pull/4898 Addressing minor issues from #4844. Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Make sure there is a [JIRA issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes. - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue. - [ ] Write a pull request description that is detailed enough to understand: - [ ] What the pull request does - [ ] Why it does it - [ ] How it does it - [ ] Why this approach - [ ] Each commit in the pull request should have a meaningful subject line and body. - [ ] Run `mvn clean verify` to make sure basic checks pass. A more thorough check will be performed on your pull request automatically. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 82074) Time Spent: 13h 20m (was: 13h 10m) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 13h 20m > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82057=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82057 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 19/Mar/18 21:32 Start Date: 19/Mar/18 21:32 Worklog Time Spent: 10m Work Description: tgroh closed pull request #4844: [BEAM-3565] Add ExecutableStagePayload to simplify runner stage reconstruction URL: https://github.com/apache/beam/pull/4844 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto index b45be09efb6..3ed90dd036e 100644 --- a/model/pipeline/src/main/proto/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/beam_runner_api.proto @@ -203,6 +203,23 @@ message PCollection { DisplayData display_data = 5; } +// The payload for an executable stage. This will eventually be passed to an SDK in the form of a +// ProcessBundleDescriptor. +message ExecutableStagePayload { + + Environment environment = 1; + + // Input PCollection id. + string input = 2; + + // PTransform ids contained within this executable stage. + repeated string transforms = 3; + + // Output PCollection ids. + repeated string outputs = 4; + +} + // The payload for the primitive ParDo transform. message ParDoPayload { diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java new file mode 100644 index 000..1200dc621a7 --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java @@ -0,0 +1,25 @@ +package org.apache.beam.runners.core.construction; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.io.IOException; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.sdk.runners.AppliedPTransform; + +/** + * Utilities for converting {@link ExecutableStage}s to and from {@link RunnerApi} protocol buffers. + */ +public class ExecutableStageTranslation { + + /** Extracts an {@link ExecutableStagePayload} from the given transform. */ + public static ExecutableStagePayload getExecutableStagePayload( + AppliedPTransform appliedTransform) throws IOException { +RunnerApi.PTransform transform = +PTransformTranslation.toProto(appliedTransform, SdkComponents.create()); +checkArgument(ExecutableStage.URN.equals(transform.getSpec().getUrn())); +return ExecutableStagePayload.parseFrom(transform.getSpec().getPayload()); + } + +} diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java index 766ce0d7136..27bfed87553 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java @@ -18,19 +18,16 @@ package org.apache.beam.runners.core.construction.graph; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.collect.Iterables.getOnlyElement; - import java.util.Collection; -import java.util.Optional; +import java.util.List; import java.util.stream.Collectors; import org.apache.beam.model.pipeline.v1.RunnerApi.Components; import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload; import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection; import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; -import org.apache.beam.runners.core.construction.Environments; import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode; @@ -84,64 +81,72 @@ * follows: * * - * The {@link PTransform#getSubtransformsList()} contains the result of {@link - * #getTransforms()}. + * The {@link
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=81441=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81441 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 16/Mar/18 23:57 Start Date: 16/Mar/18 23:57 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #4777: [BEAM-3565] Add FusedPipeline#toPipeline URL: https://github.com/apache/beam/pull/4777#discussion_r175238617 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java ## @@ -161,6 +179,40 @@ private static boolean isPrimitiveTransform(PTransform transform) { return network; } + // This enables a naive implementation of topological sort, instead of doing something clever. + // Nodes with lower weight precede nodes with higher weight, and are unrelated to nodes with + // equal weight + private final LoadingCachenodeWeights = + CacheBuilder.newBuilder() + .build( + new CacheLoader () { +@Override +public Long load(@Nonnull PTransformNode transformNode) { + long parentWeight = 0L; + for (String inputPCollectionId : + transformNode.getTransform().getInputsMap().values()) { +PTransformNode upstream = +getProducer( +PipelineNode.pCollection( +inputPCollectionId, + components.getPcollectionsOrThrow(inputPCollectionId))); +parentWeight += nodeWeights.getUnchecked(upstream); + } + return 1 + parentWeight; +} + }); + + public Iterable getTopologicallyOrderedTransforms() { +return pipelineNetwork +.nodes() +.stream() +.filter(node -> node instanceof PTransformNode) +.map(PTransformNode.class::cast) +.collect( +Collectors.toCollection( +() -> new TreeSet<>(Comparator.comparingLong(nodeWeights::getUnchecked; Review comment: This comparator is really extremely inconsistent with equals, and will make it so there only looks like there's one root transform. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81441) Time Spent: 13h (was: 12h 50m) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 13h > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=80630=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80630 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 14/Mar/18 23:28 Start Date: 14/Mar/18 23:28 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #4777: [BEAM-3565] Add FusedPipeline#toPipeline URL: https://github.com/apache/beam/pull/4777#issuecomment-373208733 Please address all the comments. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 80630) Time Spent: 12h 50m (was: 12h 40m) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 12h 50m > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=79774=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79774 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 13/Mar/18 04:52 Start Date: 13/Mar/18 04:52 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #4777: [BEAM-3565] Add FusedPipeline#toPipeline URL: https://github.com/apache/beam/pull/4777#discussion_r174016261 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java ## @@ -61,12 +66,23 @@ * The returned {@link QueryablePipeline} will contain only the primitive transforms present * within the provided components. */ - public static QueryablePipeline fromComponents(Components components) { + public static QueryablePipeline forPrimitivesIn(Components components) { +return forComponents(retainOnlyPrimitives(components)); + } + + /** + * Create a new {@link QueryablePipeline} based on the provided components. + * + * Relationships between composite transforms and their subtransforms, and producer + * relationships between {@link PTransformNode transforms} and {@link PCollectionNode + * PCollections} are not yet modelled by {@link QueryablePipeline}, so the input {@link + * Components} should be treatable as though each node is a primitive. + */ + static QueryablePipeline forComponents(Components components) { Review comment: https://github.com/apache/beam/pull/4844 performs a lot of this change, without embedding the components within the stage; it will still require us to create a partial components, but should ultimately cause this construction to go through the same path as the original `queryablePipeline` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 79774) Time Spent: 12h 40m (was: 12.5h) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 12h 40m > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=79773=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79773 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 13/Mar/18 04:51 Start Date: 13/Mar/18 04:51 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #4777: [BEAM-3565] Add FusedPipeline#toPipeline URL: https://github.com/apache/beam/pull/4777#discussion_r174016261 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java ## @@ -61,12 +66,23 @@ * The returned {@link QueryablePipeline} will contain only the primitive transforms present * within the provided components. */ - public static QueryablePipeline fromComponents(Components components) { + public static QueryablePipeline forPrimitivesIn(Components components) { +return forComponents(retainOnlyPrimitives(components)); + } + + /** + * Create a new {@link QueryablePipeline} based on the provided components. + * + * Relationships between composite transforms and their subtransforms, and producer + * relationships between {@link PTransformNode transforms} and {@link PCollectionNode + * PCollections} are not yet modelled by {@link QueryablePipeline}, so the input {@link + * Components} should be treatable as though each node is a primitive. + */ + static QueryablePipeline forComponents(Components components) { Review comment: https://github.com/apache/beam/pull/4844 performs a lot of this change, without embedding the components within the stage. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 79773) Time Spent: 12.5h (was: 12h 20m) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 12.5h > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=79708=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79708 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 13/Mar/18 02:33 Start Date: 13/Mar/18 02:33 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #4844: [BEAM-3565] Add ExecutableStagePayload to simplify runner stage reconstruction URL: https://github.com/apache/beam/pull/4844#discussion_r174001324 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java ## @@ -84,64 +81,72 @@ * follows: * * - * The {@link PTransform#getSubtransformsList()} contains the result of {@link - * #getTransforms()}. + * The {@link PTransform#getSubtransformsList()} contains no subtransforms. This ensures + * that executable stages are treated as primitive transforms. * The only {@link PCollection} in the {@link PTransform#getInputsMap()} is the result of * {@link #getInputPCollection()}. * The output {@link PCollection PCollections} in the values of {@link * PTransform#getOutputsMap()} are the {@link PCollectionNode PCollections} returned by * {@link #getOutputPCollections()}. + * The {@link FunctionSpec} contains an {@link ExecutableStagePayload} which has its input Review comment: The `PTransform#getSpec()` contains an... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 79708) Time Spent: 11h 50m (was: 11h 40m) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 11h 50m > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=79713=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79713 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 13/Mar/18 02:33 Start Date: 13/Mar/18 02:33 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #4844: [BEAM-3565] Add ExecutableStagePayload to simplify runner stage reconstruction URL: https://github.com/apache/beam/pull/4844#discussion_r174001198 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java ## @@ -84,64 +81,72 @@ * follows: * * - * The {@link PTransform#getSubtransformsList()} contains the result of {@link - * #getTransforms()}. + * The {@link PTransform#getSubtransformsList()} contains no subtransforms. This ensures Review comment: s/contains no subtransforms/is empty/ Just because it scans better This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 79713) Time Spent: 12h 20m (was: 12h 10m) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 12h 20m > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=79711=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79711 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 13/Mar/18 02:33 Start Date: 13/Mar/18 02:33 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #4844: [BEAM-3565] Add ExecutableStagePayload to simplify runner stage reconstruction URL: https://github.com/apache/beam/pull/4844#discussion_r174001847 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java ## @@ -84,64 +81,72 @@ * follows: * * - * The {@link PTransform#getSubtransformsList()} contains the result of {@link - * #getTransforms()}. + * The {@link PTransform#getSubtransformsList()} contains no subtransforms. This ensures + * that executable stages are treated as primitive transforms. * The only {@link PCollection} in the {@link PTransform#getInputsMap()} is the result of * {@link #getInputPCollection()}. * The output {@link PCollection PCollections} in the values of {@link * PTransform#getOutputsMap()} are the {@link PCollectionNode PCollections} returned by * {@link #getOutputPCollections()}. + * The {@link FunctionSpec} contains an {@link ExecutableStagePayload} which has its input + * and output PCollections set to the same values as the outer PTransform itself. It further + * contains the environment set of transforms for this stage. * + * + * The executable stage can be reconstructed from the resulting {@link ExecutableStagePayload} + * and components alone via {@link #fromPayload(ExecutableStagePayload, Components)}. */ default PTransform toPTransform() { +ExecutableStagePayload.Builder payload = ExecutableStagePayload.newBuilder(); + +payload.setEnvironment(getEnvironment()); + +PCollectionNode input = getInputPCollection(); +payload.setInput(input.getId()); + +for (PTransformNode transform : getTransforms()) { + payload.addTransforms(transform.getId()); Review comment: I'm not huge on the double iteration, given that I think you can populate both the payload and the transform builder in the same places to maximize mental locality (e.g. populate `payload.setInput` and then `pt.putInput("input", `) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 79711) Time Spent: 12h 10m (was: 12h) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 12h 10m > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=79712=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79712 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 13/Mar/18 02:33 Start Date: 13/Mar/18 02:33 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #4844: [BEAM-3565] Add ExecutableStagePayload to simplify runner stage reconstruction URL: https://github.com/apache/beam/pull/4844#discussion_r174001680 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java ## @@ -84,64 +81,72 @@ * follows: * * - * The {@link PTransform#getSubtransformsList()} contains the result of {@link - * #getTransforms()}. + * The {@link PTransform#getSubtransformsList()} contains no subtransforms. This ensures + * that executable stages are treated as primitive transforms. * The only {@link PCollection} in the {@link PTransform#getInputsMap()} is the result of * {@link #getInputPCollection()}. * The output {@link PCollection PCollections} in the values of {@link * PTransform#getOutputsMap()} are the {@link PCollectionNode PCollections} returned by * {@link #getOutputPCollections()}. + * The {@link FunctionSpec} contains an {@link ExecutableStagePayload} which has its input + * and output PCollections set to the same values as the outer PTransform itself. It further + * contains the environment set of transforms for this stage. * + * + * The executable stage can be reconstructed from the resulting {@link ExecutableStagePayload} + * and components alone via {@link #fromPayload(ExecutableStagePayload, Components)}. */ default PTransform toPTransform() { +ExecutableStagePayload.Builder payload = ExecutableStagePayload.newBuilder(); + +payload.setEnvironment(getEnvironment()); + +PCollectionNode input = getInputPCollection(); +payload.setInput(input.getId()); + +for (PTransformNode transform : getTransforms()) { + payload.addTransforms(transform.getId()); +} + +for (PCollectionNode output : getOutputPCollections()) { + payload.addOutputs(output.getId()); +} + PTransform.Builder pt = PTransform.newBuilder(); +pt.setSpec(FunctionSpec.newBuilder() +.setUrn(ExecutableStage.URN) +.setPayload(payload.build().toByteString()) +.build()); pt.putInputs("input", getInputPCollection().getId()); -int i = 0; -for (PCollectionNode materializedPCollection : getOutputPCollections()) { - pt.putOutputs(String.format("materialized_%s", i), materializedPCollection.getId()); - i++; -} -for (PTransformNode fusedTransform : getTransforms()) { - pt.addSubtransforms(fusedTransform.getId()); +int outputIndex = 0; +for (PCollectionNode pcNode : getOutputPCollections()) { + // Do something + pt.putOutputs(String.format("materialized_%d", outputIndex), pcNode.getId()); + outputIndex++; } -pt.setSpec(FunctionSpec.newBuilder().setUrn(ExecutableStage.URN)); return pt.build(); } + // TODO: Should this live under ExecutableStageTranslation? Review comment: Regardless of if it should or shouldn't, you should have a JIRA to determine it. For what it's worth, I think the toProto and fromProto methods should be coresident, and I think this is a totally reasonable place for them (as it's already involved with the proto representation of the pipeline, so we don't get significant separability between the java and proto representations.) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 79712) Time Spent: 12h 10m (was: 12h) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 12h 10m > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=79710=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79710 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 13/Mar/18 02:33 Start Date: 13/Mar/18 02:33 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #4844: [BEAM-3565] Add ExecutableStagePayload to simplify runner stage reconstruction URL: https://github.com/apache/beam/pull/4844#discussion_r174001443 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java ## @@ -84,64 +81,72 @@ * follows: * * - * The {@link PTransform#getSubtransformsList()} contains the result of {@link - * #getTransforms()}. + * The {@link PTransform#getSubtransformsList()} contains no subtransforms. This ensures + * that executable stages are treated as primitive transforms. * The only {@link PCollection} in the {@link PTransform#getInputsMap()} is the result of * {@link #getInputPCollection()}. * The output {@link PCollection PCollections} in the values of {@link * PTransform#getOutputsMap()} are the {@link PCollectionNode PCollections} returned by * {@link #getOutputPCollections()}. + * The {@link FunctionSpec} contains an {@link ExecutableStagePayload} which has its input + * and output PCollections set to the same values as the outer PTransform itself. It further + * contains the environment set of transforms for this stage. * + * + * The executable stage can be reconstructed from the resulting {@link ExecutableStagePayload} + * and components alone via {@link #fromPayload(ExecutableStagePayload, Components)}. */ default PTransform toPTransform() { +ExecutableStagePayload.Builder payload = ExecutableStagePayload.newBuilder(); + +payload.setEnvironment(getEnvironment()); + +PCollectionNode input = getInputPCollection(); +payload.setInput(input.getId()); + +for (PTransformNode transform : getTransforms()) { + payload.addTransforms(transform.getId()); +} + +for (PCollectionNode output : getOutputPCollections()) { + payload.addOutputs(output.getId()); +} + PTransform.Builder pt = PTransform.newBuilder(); +pt.setSpec(FunctionSpec.newBuilder() +.setUrn(ExecutableStage.URN) +.setPayload(payload.build().toByteString()) +.build()); pt.putInputs("input", getInputPCollection().getId()); -int i = 0; -for (PCollectionNode materializedPCollection : getOutputPCollections()) { - pt.putOutputs(String.format("materialized_%s", i), materializedPCollection.getId()); - i++; -} -for (PTransformNode fusedTransform : getTransforms()) { - pt.addSubtransforms(fusedTransform.getId()); +int outputIndex = 0; +for (PCollectionNode pcNode : getOutputPCollections()) { + // Do something Review comment: What's this comment? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 79710) Time Spent: 12h (was: 11h 50m) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 12h > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=79709=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79709 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 13/Mar/18 02:33 Start Date: 13/Mar/18 02:33 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #4844: [BEAM-3565] Add ExecutableStagePayload to simplify runner stage reconstruction URL: https://github.com/apache/beam/pull/4844#discussion_r174001107 ## File path: model/pipeline/src/main/proto/beam_runner_api.proto ## @@ -203,6 +203,23 @@ message PCollection { DisplayData display_data = 5; } +// The payload for an executable stage. This will eventually be passed to an SDK in the form of a +// ProcessBundleDescriptor. +message ExecutableStagePayload { + + Environment environment = 1; Review comment: I'd like a comment explaining why this is a value rather than an ID, or replace it with `string environment_id` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 79709) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 11h 50m > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=79027=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79027 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 09/Mar/18 21:34 Start Date: 09/Mar/18 21:34 Worklog Time Spent: 10m Work Description: bsidhom opened a new pull request #4844: [BEAM-3565] Add ExecutableStagePayload to make aid runner stage reconstruction URL: https://github.com/apache/beam/pull/4844 Runner code generally only has access to the root pipeline and `AppliedPTransform`s while walking the pipeline to perform translation. This change adds a new `ExecutableStagePayload` message which is embedded in the `FunctionSpec` of serialized `ExecutableStage` `PTransform`s. Doing so avoids the extra pipeline munging necessary to access to proper set of subtransforms. It also means that subtransforms are _not_ added directly to `ExecutableStage` transforms. This effectively makes executable stages native transforms, which they should be as far as the runner is concerned. Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Make sure there is a [JIRA issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes. - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue. - [ ] Write a pull request description that is detailed enough to understand: - [ ] What the pull request does - [ ] Why it does it - [ ] How it does it - [ ] Why this approach - [ ] Each commit in the pull request should have a meaningful subject line and body. - [ ] Run `mvn clean verify` to make sure basic checks pass. A more thorough check will be performed on your pull request automatically. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 79027) Time Spent: 11.5h (was: 11h 20m) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 11.5h > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness
[ https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=79028=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79028 ] ASF GitHub Bot logged work on BEAM-3565: Author: ASF GitHub Bot Created on: 09/Mar/18 21:34 Start Date: 09/Mar/18 21:34 Worklog Time Spent: 10m Work Description: bsidhom commented on issue #4844: [BEAM-3565] Add ExecutableStagePayload to make aid runner stage reconstruction URL: https://github.com/apache/beam/pull/4844#issuecomment-371951519 R: @tgroh This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 79028) Time Spent: 11h 40m (was: 11.5h) > Add utilities for producing a collection of PTransforms that can execute in a > single SDK Harness > > > Key: BEAM-3565 > URL: https://issues.apache.org/jira/browse/BEAM-3565 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Fix For: 2.4.0 > > Time Spent: 11h 40m > Remaining Estimate: 0h > > An SDK Harness executes some ("fused") collection of PTransforms. The java > runner libraries should provide some way to take a Pipeline that executes in > both a runner and an environment and construct a collection of transforms > which can execute within a single environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)