[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=83709=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83709
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 23/Mar/18 17:35
Start Date: 23/Mar/18 17:35
Worklog Time Spent: 10m 
  Work Description: tgroh closed pull request #4777: [BEAM-3565] Add 
FusedPipeline#toPipeline
URL: https://github.com/apache/beam/pull/4777
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java
index c371920d47e..68da5c3961b 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java
@@ -19,54 +19,95 @@
 package org.apache.beam.runners.core.construction.graph;
 
 import com.google.auto.value.AutoValue;
+import com.google.common.collect.Sets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
-import org.apache.beam.model.pipeline.v1.RunnerApi.Components.Builder;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 
-/**
- * A {@link Pipeline} which has been separated into collections of executable 
components.
- */
+/** A {@link Pipeline} which has been separated into collections of executable 
components. */
 @AutoValue
 public abstract class FusedPipeline {
   static FusedPipeline of(
-  Set environmentalStages, Set 
runnerStages) {
-return new AutoValue_FusedPipeline(environmentalStages, runnerStages);
+  Components components,
+  Set environmentalStages,
+  Set runnerStages) {
+return new AutoValue_FusedPipeline(components, environmentalStages, 
runnerStages);
   }
 
-  /**
-   * The {@link ExecutableStage executable stages} that are executed by SDK 
harnesses.
-   */
+  abstract Components getComponents();
+
+  /** The {@link ExecutableStage executable stages} that are executed by SDK 
harnesses. */
   public abstract Set getFusedStages();
 
+  /** The {@link PTransform PTransforms} that a runner is responsible for 
executing. */
+  public abstract Set getRunnerExecutedTransforms();
+
   /**
-   * The {@link PTransform PTransforms} that a runner is responsible for 
executing.
+   * Returns the {@link RunnerApi.Pipeline} representation of this {@link 
FusedPipeline}.
+   *
+   * The {@link Components} of the returned pipeline will contain all of 
the {@link PTransform
+   * PTransforms} present in the original Pipeline that this {@link 
FusedPipeline} was created from,
+   * plus all of the {@link ExecutableStage ExecutableStages} contained within 
this {@link
+   * FusedPipeline}. The {@link Pipeline#getRootTransformIdsList()} will 
contain all of the runner
+   * executed transforms and all of the {@link ExecutableStage execuable 
stages} contained within
+   * the Pipeline.
*/
-  public abstract Set getRunnerExecutedTransforms();
+  public RunnerApi.Pipeline toPipeline() {
+Map executableStageTransforms = 
getEnvironmentExecutedTransforms();
+Set executableTransformIds =
+Sets.union(
+executableStageTransforms.keySet(),
+getRunnerExecutedTransforms()
+.stream()
+.map(PTransformNode::getId)
+.collect(Collectors.toSet()));
+
+// Augment the initial transforms with all of the executable transforms.
+Components fusedComponents =
+
getComponents().toBuilder().putAllTransforms(executableStageTransforms).build();
+List rootTransformIds =
+StreamSupport.stream(
+QueryablePipeline.forTransforms(executableTransformIds, 
fusedComponents)
+.getTopologicallyOrderedTransforms()
+.spliterator(),
+false)
+.map(PTransformNode::getId)
+.collect(Collectors.toList());
+return Pipeline.newBuilder()
+.setComponents(fusedComponents)
+.addAllRootTransformIds(rootTransformIds)
+.build();
+  }
 
   /**
-   * Return a 

[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-21 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82996=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82996
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 21/Mar/18 23:46
Start Date: 21/Mar/18 23:46
Worklog Time Spent: 10m 
  Work Description: tgroh commented on issue #4777: [BEAM-3565] Add 
FusedPipeline#toPipeline
URL: https://github.com/apache/beam/pull/4777#issuecomment-375132231
 
 
   Done to all


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82996)
Time Spent: 18h  (was: 17h 50m)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 18h
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-21 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82987=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82987
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 21/Mar/18 23:18
Start Date: 21/Mar/18 23:18
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #4777: 
[BEAM-3565] Add FusedPipeline#toPipeline
URL: https://github.com/apache/beam/pull/4777#discussion_r176269191
 
 

 ##
 File path: 
runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/FusedPipelineTest.java
 ##
 @@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction.graph;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasItems;
+import static org.hamcrest.Matchers.hasKey;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link FusedPipeline}. */
+@RunWith(JUnit4.class)
+public class FusedPipelineTest implements Serializable {
+  @Test
+  public void testToProto() {
+Pipeline p = Pipeline.create();
+p.apply("impulse", Impulse.create())
+.apply("map", MapElements.into(TypeDescriptors.integers()).via(bytes 
-> bytes.length))
+.apply("key", WithKeys.of("foo"))
+.apply("gbk", GroupByKey.create())
+.apply("values", Values.create());
+
+RunnerApi.Pipeline protoPipeline = PipelineTranslation.toProto(p);
+checkState(
+protoPipeline
+.getRootTransformIdsList()
+.containsAll(ImmutableList.of("impulse", "map", "key", "gbk", 
"values")),
+"Unexpected Root Transform IDs %s",
+protoPipeline.getRootTransformIdsList());
+
+FusedPipeline fused = GreedyPipelineFuser.fuse(protoPipeline);
+checkState(
+fused.getRunnerExecutedTransforms().size() == 2,
+"Unexpected number of runner transforms %s",
+fused.getRunnerExecutedTransforms());
+checkState(
+fused.getFusedStages().size() == 2,
+"Unexpected number of fused stages %s",
+fused.getFusedStages());
+RunnerApi.Pipeline fusedProto = 
fused.toPipeline(protoPipeline.getComponents());
+
+assertThat(
+"Root Transforms should all be present in the Pipeline Components",
+fusedProto.getComponents().getTransformsMap().keySet(),
+hasItems(fusedProto.getRootTransformIdsList().toArray(new String[0])));
+assertThat(
+"Should contain Impulse, GroupByKey, and two Environment Stages",
+fusedProto.getRootTransformIdsCount(),
+equalTo(4));
+assertThat(fusedProto.getRootTransformIdsList(), hasItems("impulse", 
"gbk"));
+assertRootsInTopologicalOrder(fusedProto);
+// Since MapElements, WithKeys, and Values are all composites of a ParDo, 
we do prefix matching
+// instead of looking at the inside of their expansions
+assertThat(
+"Fused transforms should be present in the components",
+

[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-21 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82986=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82986
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 21/Mar/18 23:18
Start Date: 21/Mar/18 23:18
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #4777: 
[BEAM-3565] Add FusedPipeline#toPipeline
URL: https://github.com/apache/beam/pull/4777#discussion_r176267337
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java
 ##
 @@ -36,25 +36,45 @@
 @AutoValue
 public abstract class FusedPipeline {
   static FusedPipeline of(
-  Set environmentalStages, Set 
runnerStages) {
-return new AutoValue_FusedPipeline(environmentalStages, runnerStages);
+  Components components,
+  Set environmentalStages,
+  Set runnerStages) {
+return new AutoValue_FusedPipeline(components, environmentalStages, 
runnerStages);
   }
 
+  abstract Components getComponents();
+
   /** The {@link ExecutableStage executable stages} that are executed by SDK 
harnesses. */
   public abstract Set getFusedStages();
 
   /** The {@link PTransform PTransforms} that a runner is responsible for 
executing. */
   public abstract Set getRunnerExecutedTransforms();
 
-  public RunnerApi.Pipeline toPipeline(Components initialComponents) {
-Map executableTransforms = 
getExecutableTransforms(initialComponents);
-Components fusedComponents = initialComponents.toBuilder()
-.putAllTransforms(executableTransforms)
-.putAllTransforms(getFusedTransforms())
-.build();
+  /**
+   * Returns the {@link RunnerApi.Pipeline} representation of this {@link 
FusedPipeline}.
+   *
+   * The {@link Components} of the returned pipeline will contain all of 
the {@link PTransform
+   * PTransforms} present in the original Pipeline that this {@link 
FusedPipeline} was created from,
+   * plus all of the {@link ExecutableStage ExecutableStages} contained within 
this {@link
+   * FusedPipeline}. The Root Transform IDs will contain all of the runner 
executed transforms and
 
 Review comment:
   The upper casing on Root Transform IDs is strange, would you rather link the 
Pipeline root transform ids method?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82986)
Time Spent: 17h 40m  (was: 17.5h)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 17h 40m
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-21 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82984=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82984
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 21/Mar/18 23:18
Start Date: 21/Mar/18 23:18
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #4777: 
[BEAM-3565] Add FusedPipeline#toPipeline
URL: https://github.com/apache/beam/pull/4777#discussion_r176266684
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java
 ##
 @@ -36,25 +36,45 @@
 @AutoValue
 public abstract class FusedPipeline {
   static FusedPipeline of(
-  Set environmentalStages, Set 
runnerStages) {
-return new AutoValue_FusedPipeline(environmentalStages, runnerStages);
+  Components components,
+  Set environmentalStages,
+  Set runnerStages) {
+return new AutoValue_FusedPipeline(components, environmentalStages, 
runnerStages);
   }
 
+  abstract Components getComponents();
+
   /** The {@link ExecutableStage executable stages} that are executed by SDK 
harnesses. */
   public abstract Set getFusedStages();
 
   /** The {@link PTransform PTransforms} that a runner is responsible for 
executing. */
   public abstract Set getRunnerExecutedTransforms();
 
-  public RunnerApi.Pipeline toPipeline(Components initialComponents) {
-Map executableTransforms = 
getExecutableTransforms(initialComponents);
-Components fusedComponents = initialComponents.toBuilder()
-.putAllTransforms(executableTransforms)
-.putAllTransforms(getFusedTransforms())
-.build();
+  /**
+   * Returns the {@link RunnerApi.Pipeline} representation of this {@link 
FusedPipeline}.
+   *
+   * The {@link Components} of the returned pipeline will contain all of 
the {@link PTransform
+   * PTransforms} present in the original Pipeline that this {@link 
FusedPipeline} was created from,
+   * plus all of the {@link ExecutableStage ExecutableStages} contained within 
this {@link
+   * FusedPipeline}. The Root Transform IDs will contain all of the runner 
executed transforms and
+   * all of the ExecutableStages contained within the Pipeline.
 
 Review comment:
   `ExecutableStages` -> `{@link ExecutableStage executable stages}`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82984)
Time Spent: 17h 20m  (was: 17h 10m)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 17h 20m
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-21 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82985=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82985
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 21/Mar/18 23:18
Start Date: 21/Mar/18 23:18
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #4777: 
[BEAM-3565] Add FusedPipeline#toPipeline
URL: https://github.com/apache/beam/pull/4777#discussion_r176265936
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java
 ##
 @@ -19,54 +19,83 @@
 package org.apache.beam.runners.core.construction.graph;
 
 import com.google.auto.value.AutoValue;
+import com.google.common.collect.Sets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
-import org.apache.beam.model.pipeline.v1.RunnerApi.Components.Builder;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 
-/**
- * A {@link Pipeline} which has been separated into collections of executable 
components.
- */
+/** A {@link Pipeline} which has been separated into collections of executable 
components. */
 @AutoValue
 public abstract class FusedPipeline {
   static FusedPipeline of(
   Set environmentalStages, Set 
runnerStages) {
 return new AutoValue_FusedPipeline(environmentalStages, runnerStages);
   }
 
-  /**
-   * The {@link ExecutableStage executable stages} that are executed by SDK 
harnesses.
-   */
+  /** The {@link ExecutableStage executable stages} that are executed by SDK 
harnesses. */
   public abstract Set getFusedStages();
 
-  /**
-   * The {@link PTransform PTransforms} that a runner is responsible for 
executing.
-   */
+  /** The {@link PTransform PTransforms} that a runner is responsible for 
executing. */
   public abstract Set getRunnerExecutedTransforms();
 
+  public RunnerApi.Pipeline toPipeline(Components initialComponents) {
+Map executableTransforms = 
getExecutableTransforms(initialComponents);
+Components fusedComponents = initialComponents.toBuilder()
+.putAllTransforms(executableTransforms)
+.putAllTransforms(getFusedTransforms())
+.build();
+List rootTransformIds =
+StreamSupport.stream(
+QueryablePipeline.forTransforms(executableTransforms.keySet(), 
fusedComponents)
+.getTopologicallyOrderedTransforms()
+.spliterator(),
+false)
+.map(PTransformNode::getId)
+.collect(Collectors.toList());
+return Pipeline.newBuilder()
+.setComponents(fusedComponents)
+.addAllRootTransformIds(rootTransformIds)
+.build();
+  }
+
   /**
-   * Return a {@link Components} like the {@code base} components, but with 
the only transforms
-   * equal to this fused pipeline.
+   * Return a {@link Components} like the {@code base} components, but with 
the set of transforms to
 
 Review comment:
   This comment still seems out of date since it refers to `base` components.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82985)
Time Spent: 17.5h  (was: 17h 20m)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 17.5h
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-21 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82955=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82955
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 21/Mar/18 22:08
Start Date: 21/Mar/18 22:08
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #4777: 
[BEAM-3565] Add FusedPipeline#toPipeline
URL: https://github.com/apache/beam/pull/4777#discussion_r176251543
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java
 ##
 @@ -19,54 +19,83 @@
 package org.apache.beam.runners.core.construction.graph;
 
 import com.google.auto.value.AutoValue;
+import com.google.common.collect.Sets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
-import org.apache.beam.model.pipeline.v1.RunnerApi.Components.Builder;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 
-/**
- * A {@link Pipeline} which has been separated into collections of executable 
components.
- */
+/** A {@link Pipeline} which has been separated into collections of executable 
components. */
 @AutoValue
 public abstract class FusedPipeline {
   static FusedPipeline of(
   Set environmentalStages, Set 
runnerStages) {
 return new AutoValue_FusedPipeline(environmentalStages, runnerStages);
   }
 
-  /**
-   * The {@link ExecutableStage executable stages} that are executed by SDK 
harnesses.
-   */
+  /** The {@link ExecutableStage executable stages} that are executed by SDK 
harnesses. */
   public abstract Set getFusedStages();
 
-  /**
-   * The {@link PTransform PTransforms} that a runner is responsible for 
executing.
-   */
+  /** The {@link PTransform PTransforms} that a runner is responsible for 
executing. */
   public abstract Set getRunnerExecutedTransforms();
 
+  public RunnerApi.Pipeline toPipeline(Components initialComponents) {
+Map executableTransforms = 
getExecutableTransforms(initialComponents);
+Components fusedComponents = initialComponents.toBuilder()
+.putAllTransforms(executableTransforms)
+.putAllTransforms(getFusedTransforms())
+.build();
+List rootTransformIds =
+StreamSupport.stream(
+QueryablePipeline.forTransforms(executableTransforms.keySet(), 
fusedComponents)
+.getTopologicallyOrderedTransforms()
+.spliterator(),
+false)
+.map(PTransformNode::getId)
+.collect(Collectors.toList());
+return Pipeline.newBuilder()
+.setComponents(fusedComponents)
+.addAllRootTransformIds(rootTransformIds)
+.build();
+  }
+
   /**
-   * Return a {@link Components} like the {@code base} components, but with 
the only transforms
-   * equal to this fused pipeline.
+   * Return a {@link Components} like the {@code base} components, but with 
the set of transforms to
+   * be executed by the runner.
*
-   * The only composites will be the stages returned by {@link 
#getFusedStages()}.
+   * The transforms that are present in the returned map are the union of 
the results of {@link
+   * #getRunnerExecutedTransforms()} and {@link #getFusedStages()}, where each 
{@link
+   * ExecutableStage}.
 
 Review comment:
   RM'd


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82955)
Time Spent: 16h 50m  (was: 16h 40m)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 16h 50m
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should 

[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-21 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82958=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82958
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 21/Mar/18 22:08
Start Date: 21/Mar/18 22:08
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #4777: 
[BEAM-3565] Add FusedPipeline#toPipeline
URL: https://github.com/apache/beam/pull/4777#discussion_r176251681
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
 ##
 @@ -183,6 +195,19 @@ private static boolean isPrimitiveTransform(PTransform 
transform) {
 .collect(Collectors.toSet());
   }
 
+  /**
+   * Get the PCollections which are not consumed by any {@link PTransformNode} 
in this {@link
+   * QueryablePipeline}.
+   */
+  private Set getLeafPCollections() {
 
 Review comment:
   Pulled in accidentally, I think.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82958)
Time Spent: 17h 10m  (was: 17h)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 17h 10m
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-21 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82953=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82953
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 21/Mar/18 22:08
Start Date: 21/Mar/18 22:08
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #4777: 
[BEAM-3565] Add FusedPipeline#toPipeline
URL: https://github.com/apache/beam/pull/4777#discussion_r176251864
 
 

 ##
 File path: 
runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/FusedPipelineTest.java
 ##
 @@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction.graph;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasItems;
+import static org.hamcrest.Matchers.hasKey;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link FusedPipeline}. */
+@RunWith(JUnit4.class)
+public class FusedPipelineTest implements Serializable {
+  @Test
+  public void testToProto() {
+Pipeline p = Pipeline.create();
+p.apply("impulse", Impulse.create())
+.apply("map", MapElements.into(TypeDescriptors.integers()).via(bytes 
-> bytes.length))
+.apply("key", WithKeys.of("foo"))
+.apply("gbk", GroupByKey.create())
+.apply("values", Values.create());
+
+RunnerApi.Pipeline protoPipeline = PipelineTranslation.toProto(p);
+checkState(
+protoPipeline
+.getRootTransformIdsList()
+.containsAll(ImmutableList.of("impulse", "map", "key", "gbk", 
"values")),
+"Unexpected Root Transform IDs %s",
+protoPipeline.getRootTransformIdsList());
+
+FusedPipeline fused = GreedyPipelineFuser.fuse(protoPipeline);
+checkState(
+fused.getRunnerExecutedTransforms().size() == 2,
+"Unexpected number of runner transforms %s",
+fused.getRunnerExecutedTransforms());
+checkState(
+fused.getFusedStages().size() == 2,
+"Unexpected number of fused stages %s",
+fused.getFusedStages());
+RunnerApi.Pipeline fusedProto = 
fused.toPipeline(protoPipeline.getComponents());
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82953)
Time Spent: 16.5h  (was: 16h 20m)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>

[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-21 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82956=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82956
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 21/Mar/18 22:08
Start Date: 21/Mar/18 22:08
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #4777: 
[BEAM-3565] Add FusedPipeline#toPipeline
URL: https://github.com/apache/beam/pull/4777#discussion_r176251523
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java
 ##
 @@ -19,54 +19,83 @@
 package org.apache.beam.runners.core.construction.graph;
 
 import com.google.auto.value.AutoValue;
+import com.google.common.collect.Sets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
-import org.apache.beam.model.pipeline.v1.RunnerApi.Components.Builder;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 
-/**
- * A {@link Pipeline} which has been separated into collections of executable 
components.
- */
+/** A {@link Pipeline} which has been separated into collections of executable 
components. */
 @AutoValue
 public abstract class FusedPipeline {
   static FusedPipeline of(
   Set environmentalStages, Set 
runnerStages) {
 return new AutoValue_FusedPipeline(environmentalStages, runnerStages);
   }
 
-  /**
-   * The {@link ExecutableStage executable stages} that are executed by SDK 
harnesses.
-   */
+  /** The {@link ExecutableStage executable stages} that are executed by SDK 
harnesses. */
   public abstract Set getFusedStages();
 
-  /**
-   * The {@link PTransform PTransforms} that a runner is responsible for 
executing.
-   */
+  /** The {@link PTransform PTransforms} that a runner is responsible for 
executing. */
   public abstract Set getRunnerExecutedTransforms();
 
+  public RunnerApi.Pipeline toPipeline(Components initialComponents) {
+Map executableTransforms = 
getExecutableTransforms(initialComponents);
+Components fusedComponents = initialComponents.toBuilder()
+.putAllTransforms(executableTransforms)
+.putAllTransforms(getFusedTransforms())
+.build();
+List rootTransformIds =
+StreamSupport.stream(
+QueryablePipeline.forTransforms(executableTransforms.keySet(), 
fusedComponents)
+.getTopologicallyOrderedTransforms()
+.spliterator(),
+false)
+.map(PTransformNode::getId)
+.collect(Collectors.toList());
+return Pipeline.newBuilder()
+.setComponents(fusedComponents)
+.addAllRootTransformIds(rootTransformIds)
+.build();
+  }
+
   /**
-   * Return a {@link Components} like the {@code base} components, but with 
the only transforms
-   * equal to this fused pipeline.
+   * Return a {@link Components} like the {@code base} components, but with 
the set of transforms to
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82956)
Time Spent: 17h  (was: 16h 50m)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 17h
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-21 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82957=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82957
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 21/Mar/18 22:08
Start Date: 21/Mar/18 22:08
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #4777: 
[BEAM-3565] Add FusedPipeline#toPipeline
URL: https://github.com/apache/beam/pull/4777#discussion_r176250895
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java
 ##
 @@ -19,54 +19,83 @@
 package org.apache.beam.runners.core.construction.graph;
 
 import com.google.auto.value.AutoValue;
+import com.google.common.collect.Sets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
-import org.apache.beam.model.pipeline.v1.RunnerApi.Components.Builder;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 
-/**
- * A {@link Pipeline} which has been separated into collections of executable 
components.
- */
+/** A {@link Pipeline} which has been separated into collections of executable 
components. */
 @AutoValue
 public abstract class FusedPipeline {
   static FusedPipeline of(
   Set environmentalStages, Set 
runnerStages) {
 return new AutoValue_FusedPipeline(environmentalStages, runnerStages);
   }
 
-  /**
-   * The {@link ExecutableStage executable stages} that are executed by SDK 
harnesses.
-   */
+  /** The {@link ExecutableStage executable stages} that are executed by SDK 
harnesses. */
   public abstract Set getFusedStages();
 
-  /**
-   * The {@link PTransform PTransforms} that a runner is responsible for 
executing.
-   */
+  /** The {@link PTransform PTransforms} that a runner is responsible for 
executing. */
   public abstract Set getRunnerExecutedTransforms();
 
+  public RunnerApi.Pipeline toPipeline(Components initialComponents) {
 
 Review comment:
   Made into a property.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82957)
Time Spent: 17h 10m  (was: 17h)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 17h 10m
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-21 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82954=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82954
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 21/Mar/18 22:08
Start Date: 21/Mar/18 22:08
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #4777: 
[BEAM-3565] Add FusedPipeline#toPipeline
URL: https://github.com/apache/beam/pull/4777#discussion_r176252057
 
 

 ##
 File path: 
runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/FusedPipelineTest.java
 ##
 @@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction.graph;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasItems;
+import static org.hamcrest.Matchers.hasKey;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link FusedPipeline}. */
+@RunWith(JUnit4.class)
+public class FusedPipelineTest implements Serializable {
+  @Test
+  public void testToProto() {
+Pipeline p = Pipeline.create();
+p.apply("impulse", Impulse.create())
+.apply("map", MapElements.into(TypeDescriptors.integers()).via(bytes 
-> bytes.length))
+.apply("key", WithKeys.of("foo"))
+.apply("gbk", GroupByKey.create())
+.apply("values", Values.create());
+
+RunnerApi.Pipeline protoPipeline = PipelineTranslation.toProto(p);
+checkState(
+protoPipeline
+.getRootTransformIdsList()
+.containsAll(ImmutableList.of("impulse", "map", "key", "gbk", 
"values")),
+"Unexpected Root Transform IDs %s",
+protoPipeline.getRootTransformIdsList());
+
+FusedPipeline fused = GreedyPipelineFuser.fuse(protoPipeline);
+checkState(
+fused.getRunnerExecutedTransforms().size() == 2,
+"Unexpected number of runner transforms %s",
+fused.getRunnerExecutedTransforms());
+checkState(
+fused.getFusedStages().size() == 2,
+"Unexpected number of fused stages %s",
+fused.getFusedStages());
+RunnerApi.Pipeline fusedProto = 
fused.toPipeline(protoPipeline.getComponents());
+
+assertThat(
+"Root Transforms should all be present in the Pipeline Components",
+fusedProto.getComponents().getTransformsMap().keySet(),
+hasItems(fusedProto.getRootTransformIdsList().toArray(new String[0])));
+assertThat(
+"Should contain Impulse, GroupByKey, and two Environment Stages",
+fusedProto.getRootTransformIdsCount(),
+equalTo(4));
+assertThat(fusedProto.getRootTransformIdsList(), hasItems("impulse", 
"gbk"));
+assertRootsInTopologicalOrder(fusedProto);
+// Since MapElements, WithKeys, and Values are all composites of a ParDo, 
we do prefix matching
+// instead of looking at the inside of their expansions
+assertThat(
+"Fused transforms should be present in the components",
+

[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-21 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82934=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82934
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 21/Mar/18 21:41
Start Date: 21/Mar/18 21:41
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #4777: 
[BEAM-3565] Add FusedPipeline#toPipeline
URL: https://github.com/apache/beam/pull/4777#discussion_r176240754
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java
 ##
 @@ -19,54 +19,83 @@
 package org.apache.beam.runners.core.construction.graph;
 
 import com.google.auto.value.AutoValue;
+import com.google.common.collect.Sets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
-import org.apache.beam.model.pipeline.v1.RunnerApi.Components.Builder;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 
-/**
- * A {@link Pipeline} which has been separated into collections of executable 
components.
- */
+/** A {@link Pipeline} which has been separated into collections of executable 
components. */
 @AutoValue
 public abstract class FusedPipeline {
   static FusedPipeline of(
   Set environmentalStages, Set 
runnerStages) {
 return new AutoValue_FusedPipeline(environmentalStages, runnerStages);
   }
 
-  /**
-   * The {@link ExecutableStage executable stages} that are executed by SDK 
harnesses.
-   */
+  /** The {@link ExecutableStage executable stages} that are executed by SDK 
harnesses. */
   public abstract Set getFusedStages();
 
-  /**
-   * The {@link PTransform PTransforms} that a runner is responsible for 
executing.
-   */
+  /** The {@link PTransform PTransforms} that a runner is responsible for 
executing. */
   public abstract Set getRunnerExecutedTransforms();
 
+  public RunnerApi.Pipeline toPipeline(Components initialComponents) {
 
 Review comment:
   We were given the Pipeline when we constructed the FusedStage via 
GreedyPipelineFuser, why do we need `initialComponents` to be passed in again 
here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82934)
Time Spent: 16h 10m  (was: 16h)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 16h 10m
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-21 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82932=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82932
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 21/Mar/18 21:41
Start Date: 21/Mar/18 21:41
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #4777: 
[BEAM-3565] Add FusedPipeline#toPipeline
URL: https://github.com/apache/beam/pull/4777#discussion_r176237015
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java
 ##
 @@ -19,54 +19,83 @@
 package org.apache.beam.runners.core.construction.graph;
 
 import com.google.auto.value.AutoValue;
+import com.google.common.collect.Sets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
-import org.apache.beam.model.pipeline.v1.RunnerApi.Components.Builder;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 
-/**
- * A {@link Pipeline} which has been separated into collections of executable 
components.
- */
+/** A {@link Pipeline} which has been separated into collections of executable 
components. */
 @AutoValue
 public abstract class FusedPipeline {
   static FusedPipeline of(
   Set environmentalStages, Set 
runnerStages) {
 return new AutoValue_FusedPipeline(environmentalStages, runnerStages);
   }
 
-  /**
-   * The {@link ExecutableStage executable stages} that are executed by SDK 
harnesses.
-   */
+  /** The {@link ExecutableStage executable stages} that are executed by SDK 
harnesses. */
   public abstract Set getFusedStages();
 
-  /**
-   * The {@link PTransform PTransforms} that a runner is responsible for 
executing.
-   */
+  /** The {@link PTransform PTransforms} that a runner is responsible for 
executing. */
   public abstract Set getRunnerExecutedTransforms();
 
+  public RunnerApi.Pipeline toPipeline(Components initialComponents) {
+Map executableTransforms = 
getExecutableTransforms(initialComponents);
+Components fusedComponents = initialComponents.toBuilder()
+.putAllTransforms(executableTransforms)
+.putAllTransforms(getFusedTransforms())
+.build();
+List rootTransformIds =
+StreamSupport.stream(
+QueryablePipeline.forTransforms(executableTransforms.keySet(), 
fusedComponents)
+.getTopologicallyOrderedTransforms()
+.spliterator(),
+false)
+.map(PTransformNode::getId)
+.collect(Collectors.toList());
+return Pipeline.newBuilder()
+.setComponents(fusedComponents)
+.addAllRootTransformIds(rootTransformIds)
+.build();
+  }
+
   /**
-   * Return a {@link Components} like the {@code base} components, but with 
the only transforms
-   * equal to this fused pipeline.
+   * Return a {@link Components} like the {@code base} components, but with 
the set of transforms to
+   * be executed by the runner.
*
-   * The only composites will be the stages returned by {@link 
#getFusedStages()}.
+   * The transforms that are present in the returned map are the union of 
the results of {@link
+   * #getRunnerExecutedTransforms()} and {@link #getFusedStages()}, where each 
{@link
+   * ExecutableStage}.
 
 Review comment:
   where each executable stage?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82932)
Time Spent: 15h 50m  (was: 15h 40m)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 15h 50m
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 

[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-21 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82933=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82933
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 21/Mar/18 21:41
Start Date: 21/Mar/18 21:41
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #4777: 
[BEAM-3565] Add FusedPipeline#toPipeline
URL: https://github.com/apache/beam/pull/4777#discussion_r176246992
 
 

 ##
 File path: 
runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/FusedPipelineTest.java
 ##
 @@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction.graph;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasItems;
+import static org.hamcrest.Matchers.hasKey;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link FusedPipeline}. */
+@RunWith(JUnit4.class)
+public class FusedPipelineTest implements Serializable {
+  @Test
+  public void testToProto() {
+Pipeline p = Pipeline.create();
+p.apply("impulse", Impulse.create())
+.apply("map", MapElements.into(TypeDescriptors.integers()).via(bytes 
-> bytes.length))
+.apply("key", WithKeys.of("foo"))
+.apply("gbk", GroupByKey.create())
+.apply("values", Values.create());
+
+RunnerApi.Pipeline protoPipeline = PipelineTranslation.toProto(p);
+checkState(
+protoPipeline
+.getRootTransformIdsList()
+.containsAll(ImmutableList.of("impulse", "map", "key", "gbk", 
"values")),
+"Unexpected Root Transform IDs %s",
+protoPipeline.getRootTransformIdsList());
+
+FusedPipeline fused = GreedyPipelineFuser.fuse(protoPipeline);
+checkState(
+fused.getRunnerExecutedTransforms().size() == 2,
+"Unexpected number of runner transforms %s",
+fused.getRunnerExecutedTransforms());
+checkState(
+fused.getFusedStages().size() == 2,
+"Unexpected number of fused stages %s",
+fused.getFusedStages());
+RunnerApi.Pipeline fusedProto = 
fused.toPipeline(protoPipeline.getComponents());
+
+assertThat(
+"Root Transforms should all be present in the Pipeline Components",
+fusedProto.getComponents().getTransformsMap().keySet(),
+hasItems(fusedProto.getRootTransformIdsList().toArray(new String[0])));
+assertThat(
+"Should contain Impulse, GroupByKey, and two Environment Stages",
+fusedProto.getRootTransformIdsCount(),
+equalTo(4));
+assertThat(fusedProto.getRootTransformIdsList(), hasItems("impulse", 
"gbk"));
+assertRootsInTopologicalOrder(fusedProto);
+// Since MapElements, WithKeys, and Values are all composites of a ParDo, 
we do prefix matching
+// instead of looking at the inside of their expansions
+assertThat(
+"Fused transforms should be present in the components",
+

[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-21 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82937=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82937
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 21/Mar/18 21:41
Start Date: 21/Mar/18 21:41
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #4777: 
[BEAM-3565] Add FusedPipeline#toPipeline
URL: https://github.com/apache/beam/pull/4777#discussion_r176244335
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
 ##
 @@ -183,6 +195,19 @@ private static boolean isPrimitiveTransform(PTransform 
transform) {
 .collect(Collectors.toSet());
   }
 
+  /**
+   * Get the PCollections which are not consumed by any {@link PTransformNode} 
in this {@link
+   * QueryablePipeline}.
+   */
+  private Set getLeafPCollections() {
 
 Review comment:
   This isn't used anywhere, what is the intent of adding it right now?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82937)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 16h 20m
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-21 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82935=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82935
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 21/Mar/18 21:41
Start Date: 21/Mar/18 21:41
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #4777: 
[BEAM-3565] Add FusedPipeline#toPipeline
URL: https://github.com/apache/beam/pull/4777#discussion_r176245384
 
 

 ##
 File path: 
runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/FusedPipelineTest.java
 ##
 @@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction.graph;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasItems;
+import static org.hamcrest.Matchers.hasKey;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link FusedPipeline}. */
+@RunWith(JUnit4.class)
+public class FusedPipelineTest implements Serializable {
+  @Test
+  public void testToProto() {
+Pipeline p = Pipeline.create();
+p.apply("impulse", Impulse.create())
+.apply("map", MapElements.into(TypeDescriptors.integers()).via(bytes 
-> bytes.length))
+.apply("key", WithKeys.of("foo"))
+.apply("gbk", GroupByKey.create())
+.apply("values", Values.create());
+
+RunnerApi.Pipeline protoPipeline = PipelineTranslation.toProto(p);
+checkState(
+protoPipeline
+.getRootTransformIdsList()
+.containsAll(ImmutableList.of("impulse", "map", "key", "gbk", 
"values")),
+"Unexpected Root Transform IDs %s",
+protoPipeline.getRootTransformIdsList());
+
+FusedPipeline fused = GreedyPipelineFuser.fuse(protoPipeline);
+checkState(
+fused.getRunnerExecutedTransforms().size() == 2,
+"Unexpected number of runner transforms %s",
+fused.getRunnerExecutedTransforms());
+checkState(
+fused.getFusedStages().size() == 2,
+"Unexpected number of fused stages %s",
+fused.getFusedStages());
+RunnerApi.Pipeline fusedProto = 
fused.toPipeline(protoPipeline.getComponents());
 
 Review comment:
   `fusedProto` -> `fusedProtoPipeline`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82935)
Time Spent: 16h 10m  (was: 16h)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  

[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-21 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82936=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82936
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 21/Mar/18 21:41
Start Date: 21/Mar/18 21:41
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #4777: 
[BEAM-3565] Add FusedPipeline#toPipeline
URL: https://github.com/apache/beam/pull/4777#discussion_r176238126
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java
 ##
 @@ -19,54 +19,83 @@
 package org.apache.beam.runners.core.construction.graph;
 
 import com.google.auto.value.AutoValue;
+import com.google.common.collect.Sets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
-import org.apache.beam.model.pipeline.v1.RunnerApi.Components.Builder;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 
-/**
- * A {@link Pipeline} which has been separated into collections of executable 
components.
- */
+/** A {@link Pipeline} which has been separated into collections of executable 
components. */
 @AutoValue
 public abstract class FusedPipeline {
   static FusedPipeline of(
   Set environmentalStages, Set 
runnerStages) {
 return new AutoValue_FusedPipeline(environmentalStages, runnerStages);
   }
 
-  /**
-   * The {@link ExecutableStage executable stages} that are executed by SDK 
harnesses.
-   */
+  /** The {@link ExecutableStage executable stages} that are executed by SDK 
harnesses. */
   public abstract Set getFusedStages();
 
-  /**
-   * The {@link PTransform PTransforms} that a runner is responsible for 
executing.
-   */
+  /** The {@link PTransform PTransforms} that a runner is responsible for 
executing. */
   public abstract Set getRunnerExecutedTransforms();
 
+  public RunnerApi.Pipeline toPipeline(Components initialComponents) {
+Map executableTransforms = 
getExecutableTransforms(initialComponents);
+Components fusedComponents = initialComponents.toBuilder()
+.putAllTransforms(executableTransforms)
+.putAllTransforms(getFusedTransforms())
+.build();
+List rootTransformIds =
+StreamSupport.stream(
+QueryablePipeline.forTransforms(executableTransforms.keySet(), 
fusedComponents)
+.getTopologicallyOrderedTransforms()
+.spliterator(),
+false)
+.map(PTransformNode::getId)
+.collect(Collectors.toList());
+return Pipeline.newBuilder()
+.setComponents(fusedComponents)
+.addAllRootTransformIds(rootTransformIds)
+.build();
+  }
+
   /**
-   * Return a {@link Components} like the {@code base} components, but with 
the only transforms
-   * equal to this fused pipeline.
+   * Return a {@link Components} like the {@code base} components, but with 
the set of transforms to
 
 Review comment:
   This comment seems like it should apply toPipeline and not the private 
method getExecutableStages
   
   Also, Return a {@link Pipeline}


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82936)
Time Spent: 16h 20m  (was: 16h 10m)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 16h 20m
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message 

[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82551=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82551
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 21/Mar/18 00:01
Start Date: 21/Mar/18 00:01
Worklog Time Spent: 10m 
  Work Description: tgroh commented on issue #4777: [BEAM-3565] Add 
FusedPipeline#toPipeline
URL: https://github.com/apache/beam/pull/4777#issuecomment-374797898
 
 
   PTAL


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82551)
Time Spent: 15h 40m  (was: 15.5h)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 15h 40m
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82485=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82485
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 20/Mar/18 21:24
Start Date: 20/Mar/18 21:24
Worklog Time Spent: 10m 
  Work Description: tgroh closed pull request #4898: [BEAM-3565] Clean up 
ExecutableStage
URL: https://github.com/apache/beam/pull/4898
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto 
b/model/pipeline/src/main/proto/beam_runner_api.proto
index 3ed90dd036e..9fa301460fb 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -207,6 +207,8 @@ message PCollection {
 // ProcessBundleDescriptor.
 message ExecutableStagePayload {
 
+  // Environment in which this stage executes. We use an environment rather 
than environment id
+  // because ExecutableStages use environments directly. This may change in 
the future.
   Environment environment = 1;
 
   // Input PCollection id.
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
index 27bfed87553..e66148421fc 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
@@ -81,53 +81,53 @@
* follows:
*
* 
-   *   The {@link PTransform#getSubtransformsList()} contains no 
subtransforms. This ensures
+   *   The {@link PTransform#getSubtransformsList()} is empty. This ensures
*   that executable stages are treated as primitive transforms.
*   The only {@link PCollection} in the {@link 
PTransform#getInputsMap()} is the result of
*   {@link #getInputPCollection()}.
*   The output {@link PCollection PCollections} in the values of {@link
*   PTransform#getOutputsMap()} are the {@link PCollectionNode 
PCollections} returned by
*   {@link #getOutputPCollections()}.
-   *   The {@link FunctionSpec} contains an {@link ExecutableStagePayload} 
which has its input
-   *   and output PCollections set to the same values as the outer 
PTransform itself. It further
-   *   contains the environment set of transforms for this stage.
+   *   The {@link PTransform#getSpec()} contains an {@link 
ExecutableStagePayload} with inputs
+   *   and outputs equal to the PTransform's inputs and outputs, and 
transforms equal to the
+   *   result of {@link #getTransforms}.
* 
*
* The executable stage can be reconstructed from the resulting {@link 
ExecutableStagePayload}
* and components alone via {@link #fromPayload(ExecutableStagePayload, 
Components)}.
*/
   default PTransform toPTransform() {
+PTransform.Builder pt = PTransform.newBuilder();
 ExecutableStagePayload.Builder payload = 
ExecutableStagePayload.newBuilder();
 
 payload.setEnvironment(getEnvironment());
 
+// Populate inputs and outputs of the stage payload and outer PTransform 
simultaneously.
 PCollectionNode input = getInputPCollection();
+pt.putInputs("input", getInputPCollection().getId());
 payload.setInput(input.getId());
 
-for (PTransformNode transform : getTransforms()) {
-  payload.addTransforms(transform.getId());
-}
-
+int outputIndex = 0;
 for (PCollectionNode output : getOutputPCollections()) {
+  pt.putOutputs(String.format("materialized_%d", outputIndex), 
output.getId());
   payload.addOutputs(output.getId());
+  outputIndex++;
+}
+
+// Inner PTransforms of this stage are hidden from the outer pipeline and 
only belong in the
+// stage payload.
+for (PTransformNode transform : getTransforms()) {
+  payload.addTransforms(transform.getId());
 }
 
-PTransform.Builder pt = PTransform.newBuilder();
 pt.setSpec(FunctionSpec.newBuilder()
 .setUrn(ExecutableStage.URN)
 .setPayload(payload.build().toByteString())
 .build());
-pt.putInputs("input", getInputPCollection().getId());
-int outputIndex = 0;
-for (PCollectionNode pcNode : getOutputPCollections()) {
-  // Do something
-  pt.putOutputs(String.format("materialized_%d", outputIndex), 
pcNode.getId());
-  outputIndex++;
-}
+
 return pt.build();
   }
 
-  // TODO: Should this live under 

[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82440=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82440
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 20/Mar/18 19:53
Start Date: 20/Mar/18 19:53
Worklog Time Spent: 10m 
  Work Description: tgroh commented on issue #4898: [BEAM-3565] Clean up 
ExecutableStage
URL: https://github.com/apache/beam/pull/4898#issuecomment-374734491
 
 
   run gradle java precommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82440)
Time Spent: 15h  (was: 14h 50m)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 15h
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82441=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82441
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 20/Mar/18 19:53
Start Date: 20/Mar/18 19:53
Worklog Time Spent: 10m 
  Work Description: tgroh commented on issue #4898: [BEAM-3565] Clean up 
ExecutableStage
URL: https://github.com/apache/beam/pull/4898#issuecomment-374734566
 
 
   run java gradle precommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82441)
Time Spent: 15h 10m  (was: 15h)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 15h 10m
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82439=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82439
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 20/Mar/18 19:53
Start Date: 20/Mar/18 19:53
Worklog Time Spent: 10m 
  Work Description: tgroh commented on issue #4898: [BEAM-3565] Clean up 
ExecutableStage
URL: https://github.com/apache/beam/pull/4898#issuecomment-374734491
 
 
   run gradle java precommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82439)
Time Spent: 14h 50m  (was: 14h 40m)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 14h 50m
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82442=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82442
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 20/Mar/18 19:53
Start Date: 20/Mar/18 19:53
Worklog Time Spent: 10m 
  Work Description: tgroh commented on issue #4898: [BEAM-3565] Clean up 
ExecutableStage
URL: https://github.com/apache/beam/pull/4898#issuecomment-374734566
 
 
   run java gradle precommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82442)
Time Spent: 15h 20m  (was: 15h 10m)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 15h 20m
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82385=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82385
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 20/Mar/18 18:07
Start Date: 20/Mar/18 18:07
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #4898: 
[BEAM-3565] Clean up ExecutableStage
URL: https://github.com/apache/beam/pull/4898#discussion_r175870104
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
 ##
 @@ -81,53 +81,53 @@
* follows:
*
* 
-   *   The {@link PTransform#getSubtransformsList()} contains no 
subtransforms. This ensures
+   *   The {@link PTransform#getSubtransformsList()} is empty. This ensures
*   that executable stages are treated as primitive transforms.
*   The only {@link PCollection} in the {@link 
PTransform#getInputsMap()} is the result of
*   {@link #getInputPCollection()}.
*   The output {@link PCollection PCollections} in the values of {@link
*   PTransform#getOutputsMap()} are the {@link PCollectionNode 
PCollections} returned by
*   {@link #getOutputPCollections()}.
-   *   The {@link FunctionSpec} contains an {@link ExecutableStagePayload} 
which has its input
-   *   and output PCollections set to the same values as the outer 
PTransform itself. It further
-   *   contains the environment set of transforms for this stage.
+   *   The {@link PTransform#getSpec()} contains an {@link 
ExecutableStagePayload} which has its
+   *   input and output PCollections set to the same values as the outer 
PTransform itself. It
+   *   further contains the environment set of transforms for this stage.
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82385)
Time Spent: 14h 40m  (was: 14.5h)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 14h 40m
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82381=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82381
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 20/Mar/18 18:02
Start Date: 20/Mar/18 18:02
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #4777: 
[BEAM-3565] Add FusedPipeline#toPipeline
URL: https://github.com/apache/beam/pull/4777#discussion_r175868383
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java
 ##
 @@ -19,54 +19,84 @@
 package org.apache.beam.runners.core.construction.graph;
 
 import com.google.auto.value.AutoValue;
+import com.google.common.collect.Sets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
-import org.apache.beam.model.pipeline.v1.RunnerApi.Components.Builder;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 
-/**
- * A {@link Pipeline} which has been separated into collections of executable 
components.
- */
+/** A {@link Pipeline} which has been separated into collections of executable 
components. */
 @AutoValue
 public abstract class FusedPipeline {
   static FusedPipeline of(
   Set environmentalStages, Set 
runnerStages) {
 return new AutoValue_FusedPipeline(environmentalStages, runnerStages);
   }
 
-  /**
-   * The {@link ExecutableStage executable stages} that are executed by SDK 
harnesses.
-   */
+  /** The {@link ExecutableStage executable stages} that are executed by SDK 
harnesses. */
   public abstract Set getFusedStages();
 
-  /**
-   * The {@link PTransform PTransforms} that a runner is responsible for 
executing.
-   */
+  /** The {@link PTransform PTransforms} that a runner is responsible for 
executing. */
   public abstract Set getRunnerExecutedTransforms();
 
+  public RunnerApi.Pipeline toPipeline(Components initialComponents) {
+Components executableComponents =
+initialComponents
+.toBuilder()
+.clearTransforms()
+.putAllTransforms(getTopLevelTransforms(initialComponents))
+.build();
+List rootTransformIds =
+StreamSupport.stream(
+QueryablePipeline.forComponents(executableComponents)
+.getTopologicallyOrderedTransforms()
+.spliterator(),
+false)
+.map(PTransformNode::getId)
+.collect(Collectors.toList());
+return Pipeline.newBuilder()
+
.setComponents(executableComponents.toBuilder().putAllTransforms(getFusedTransforms()))
+.addAllRootTransformIds(rootTransformIds)
+.build();
+  }
+
   /**
* Return a {@link Components} like the {@code base} components, but with 
the only transforms
* equal to this fused pipeline.
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82381)
Time Spent: 14.5h  (was: 14h 20m)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 14.5h
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82380=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82380
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 20/Mar/18 18:01
Start Date: 20/Mar/18 18:01
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #4777: 
[BEAM-3565] Add FusedPipeline#toPipeline
URL: https://github.com/apache/beam/pull/4777#discussion_r175868128
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java
 ##
 @@ -19,54 +19,84 @@
 package org.apache.beam.runners.core.construction.graph;
 
 import com.google.auto.value.AutoValue;
+import com.google.common.collect.Sets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
-import org.apache.beam.model.pipeline.v1.RunnerApi.Components.Builder;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 
-/**
- * A {@link Pipeline} which has been separated into collections of executable 
components.
- */
+/** A {@link Pipeline} which has been separated into collections of executable 
components. */
 @AutoValue
 public abstract class FusedPipeline {
   static FusedPipeline of(
   Set environmentalStages, Set 
runnerStages) {
 return new AutoValue_FusedPipeline(environmentalStages, runnerStages);
   }
 
-  /**
-   * The {@link ExecutableStage executable stages} that are executed by SDK 
harnesses.
-   */
+  /** The {@link ExecutableStage executable stages} that are executed by SDK 
harnesses. */
   public abstract Set getFusedStages();
 
-  /**
-   * The {@link PTransform PTransforms} that a runner is responsible for 
executing.
-   */
+  /** The {@link PTransform PTransforms} that a runner is responsible for 
executing. */
   public abstract Set getRunnerExecutedTransforms();
 
+  public RunnerApi.Pipeline toPipeline(Components initialComponents) {
+Components executableComponents =
+initialComponents
+.toBuilder()
+.clearTransforms()
+.putAllTransforms(getTopLevelTransforms(initialComponents))
+.build();
+List rootTransformIds =
+StreamSupport.stream(
+QueryablePipeline.forComponents(executableComponents)
+.getTopologicallyOrderedTransforms()
+.spliterator(),
+false)
+.map(PTransformNode::getId)
+.collect(Collectors.toList());
+return Pipeline.newBuilder()
+
.setComponents(executableComponents.toBuilder().putAllTransforms(getFusedTransforms()))
+.addAllRootTransformIds(rootTransformIds)
+.build();
+  }
+
   /**
* Return a {@link Components} like the {@code base} components, but with 
the only transforms
* equal to this fused pipeline.
*
-   * The only composites will be the stages returned by {@link 
#getFusedStages()}.
+   * The only composites will be the stages returned by {@link 
#getFusedStages()}, and the only
+   * primitives will be the {@link PTransformNode transforms} returned by 
{@link
+   * #getRunnerExecutedTransforms()}.
*/
-  public Components asComponents(Components base) {
-Builder newComponents = base.toBuilder().clearTransforms();
+  private Map getTopLevelTransforms(Components base) {
 
 Review comment:
   `getExecutableTransforms` is what I am calling this now.
   
   All of the transforms are primitives, but there are non-executable 
transforms (within a stage) that get merged into the components as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82380)
Time Spent: 14h 20m  (was: 14h 10m)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>

[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82372=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82372
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 20/Mar/18 17:59
Start Date: 20/Mar/18 17:59
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #4898: 
[BEAM-3565] Clean up ExecutableStage
URL: https://github.com/apache/beam/pull/4898#discussion_r175867418
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
 ##
 @@ -81,53 +81,53 @@
* follows:
*
* 
-   *   The {@link PTransform#getSubtransformsList()} contains no 
subtransforms. This ensures
+   *   The {@link PTransform#getSubtransformsList()} is empty. This ensures
*   that executable stages are treated as primitive transforms.
*   The only {@link PCollection} in the {@link 
PTransform#getInputsMap()} is the result of
*   {@link #getInputPCollection()}.
*   The output {@link PCollection PCollections} in the values of {@link
*   PTransform#getOutputsMap()} are the {@link PCollectionNode 
PCollections} returned by
*   {@link #getOutputPCollections()}.
-   *   The {@link FunctionSpec} contains an {@link ExecutableStagePayload} 
which has its input
-   *   and output PCollections set to the same values as the outer 
PTransform itself. It further
-   *   contains the environment set of transforms for this stage.
+   *   The {@link PTransform#getSpec()} contains an {@link 
ExecutableStagePayload} which has its
+   *   input and output PCollections set to the same values as the outer 
PTransform itself. It
+   *   further contains the environment set of transforms for this stage.
 
 Review comment:
   "The {@link PTransform#getSpec()} contains an {@link ExecutableStagePayload} 
with inputs and outputs equal to the PTransform's inputs and outputs, and 
transforms equal to the result of {@link #getTransforms}"


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82372)
Time Spent: 14h 10m  (was: 14h)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 14h 10m
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82364=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82364
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 20/Mar/18 17:40
Start Date: 20/Mar/18 17:40
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #4777: 
[BEAM-3565] Add FusedPipeline#toPipeline
URL: https://github.com/apache/beam/pull/4777#discussion_r175238617
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
 ##
 @@ -161,6 +179,40 @@ private static boolean isPrimitiveTransform(PTransform 
transform) {
 return network;
   }
 
+  // This enables a naive implementation of topological sort, instead of doing 
something clever.
+  // Nodes with lower weight precede nodes with higher weight, and are 
unrelated to nodes with
+  // equal weight
+  private final LoadingCache nodeWeights =
+  CacheBuilder.newBuilder()
+  .build(
+  new CacheLoader() {
+@Override
+public Long load(@Nonnull PTransformNode transformNode) {
+  long parentWeight = 0L;
+  for (String inputPCollectionId :
+  transformNode.getTransform().getInputsMap().values()) {
+PTransformNode upstream =
+getProducer(
+PipelineNode.pCollection(
+inputPCollectionId,
+
components.getPcollectionsOrThrow(inputPCollectionId)));
+parentWeight += nodeWeights.getUnchecked(upstream);
+  }
+  return 1 + parentWeight;
+}
+  });
+
+  public Iterable getTopologicallyOrderedTransforms() {
+return pipelineNetwork
+.nodes()
+.stream()
+.filter(node -> node instanceof PTransformNode)
+.map(PTransformNode.class::cast)
+.collect(
+Collectors.toCollection(
+() -> new 
TreeSet<>(Comparator.comparingLong(nodeWeights::getUnchecked;
 
 Review comment:
   This comparator is really extremely inconsistent with equals, and will make 
it so there only looks like there's one root transform.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82364)
Time Spent: 14h  (was: 13h 50m)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 14h
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82075=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82075
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 19/Mar/18 22:41
Start Date: 19/Mar/18 22:41
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #4844: 
[BEAM-3565] Add ExecutableStagePayload to simplify runner stage reconstruction
URL: https://github.com/apache/beam/pull/4844#discussion_r175603054
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
 ##
 @@ -84,64 +81,72 @@
* follows:
*
* 
-   *   The {@link PTransform#getSubtransformsList()} contains the result 
of {@link
-   *   #getTransforms()}.
+   *   The {@link PTransform#getSubtransformsList()} contains no 
subtransforms. This ensures
+   *   that executable stages are treated as primitive transforms.
*   The only {@link PCollection} in the {@link 
PTransform#getInputsMap()} is the result of
*   {@link #getInputPCollection()}.
*   The output {@link PCollection PCollections} in the values of {@link
*   PTransform#getOutputsMap()} are the {@link PCollectionNode 
PCollections} returned by
*   {@link #getOutputPCollections()}.
+   *   The {@link FunctionSpec} contains an {@link ExecutableStagePayload} 
which has its input
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82075)
Time Spent: 13.5h  (was: 13h 20m)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 13.5h
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82079=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82079
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 19/Mar/18 22:41
Start Date: 19/Mar/18 22:41
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #4844: 
[BEAM-3565] Add ExecutableStagePayload to simplify runner stage reconstruction
URL: https://github.com/apache/beam/pull/4844#discussion_r175595281
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
 ##
 @@ -84,64 +81,72 @@
* follows:
*
* 
-   *   The {@link PTransform#getSubtransformsList()} contains the result 
of {@link
-   *   #getTransforms()}.
+   *   The {@link PTransform#getSubtransformsList()} contains no 
subtransforms. This ensures
+   *   that executable stages are treated as primitive transforms.
*   The only {@link PCollection} in the {@link 
PTransform#getInputsMap()} is the result of
*   {@link #getInputPCollection()}.
*   The output {@link PCollection PCollections} in the values of {@link
*   PTransform#getOutputsMap()} are the {@link PCollectionNode 
PCollections} returned by
*   {@link #getOutputPCollections()}.
+   *   The {@link FunctionSpec} contains an {@link ExecutableStagePayload} 
which has its input
+   *   and output PCollections set to the same values as the outer 
PTransform itself. It further
+   *   contains the environment set of transforms for this stage.
* 
+   *
+   * The executable stage can be reconstructed from the resulting {@link 
ExecutableStagePayload}
+   * and components alone via {@link #fromPayload(ExecutableStagePayload, 
Components)}.
*/
   default PTransform toPTransform() {
+ExecutableStagePayload.Builder payload = 
ExecutableStagePayload.newBuilder();
+
+payload.setEnvironment(getEnvironment());
+
+PCollectionNode input = getInputPCollection();
+payload.setInput(input.getId());
+
+for (PTransformNode transform : getTransforms()) {
+  payload.addTransforms(transform.getId());
+}
+
+for (PCollectionNode output : getOutputPCollections()) {
+  payload.addOutputs(output.getId());
+}
+
 PTransform.Builder pt = PTransform.newBuilder();
+pt.setSpec(FunctionSpec.newBuilder()
+.setUrn(ExecutableStage.URN)
+.setPayload(payload.build().toByteString())
+.build());
 pt.putInputs("input", getInputPCollection().getId());
-int i = 0;
-for (PCollectionNode materializedPCollection : getOutputPCollections()) {
-  pt.putOutputs(String.format("materialized_%s", i), 
materializedPCollection.getId());
-  i++;
-}
-for (PTransformNode fusedTransform : getTransforms()) {
-  pt.addSubtransforms(fusedTransform.getId());
+int outputIndex = 0;
+for (PCollectionNode pcNode : getOutputPCollections()) {
+  // Do something
 
 Review comment:
   Meaningless.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82079)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 13h 40m
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82077=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82077
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 19/Mar/18 22:41
Start Date: 19/Mar/18 22:41
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #4844: 
[BEAM-3565] Add ExecutableStagePayload to simplify runner stage reconstruction
URL: https://github.com/apache/beam/pull/4844#discussion_r175594386
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
 ##
 @@ -84,64 +81,72 @@
* follows:
*
* 
-   *   The {@link PTransform#getSubtransformsList()} contains the result 
of {@link
-   *   #getTransforms()}.
+   *   The {@link PTransform#getSubtransformsList()} contains no 
subtransforms. This ensures
+   *   that executable stages are treated as primitive transforms.
*   The only {@link PCollection} in the {@link 
PTransform#getInputsMap()} is the result of
*   {@link #getInputPCollection()}.
*   The output {@link PCollection PCollections} in the values of {@link
*   PTransform#getOutputsMap()} are the {@link PCollectionNode 
PCollections} returned by
*   {@link #getOutputPCollections()}.
+   *   The {@link FunctionSpec} contains an {@link ExecutableStagePayload} 
which has its input
+   *   and output PCollections set to the same values as the outer 
PTransform itself. It further
+   *   contains the environment set of transforms for this stage.
* 
+   *
+   * The executable stage can be reconstructed from the resulting {@link 
ExecutableStagePayload}
+   * and components alone via {@link #fromPayload(ExecutableStagePayload, 
Components)}.
*/
   default PTransform toPTransform() {
+ExecutableStagePayload.Builder payload = 
ExecutableStagePayload.newBuilder();
+
+payload.setEnvironment(getEnvironment());
+
+PCollectionNode input = getInputPCollection();
+payload.setInput(input.getId());
+
+for (PTransformNode transform : getTransforms()) {
+  payload.addTransforms(transform.getId());
+}
+
+for (PCollectionNode output : getOutputPCollections()) {
+  payload.addOutputs(output.getId());
+}
+
 PTransform.Builder pt = PTransform.newBuilder();
+pt.setSpec(FunctionSpec.newBuilder()
+.setUrn(ExecutableStage.URN)
+.setPayload(payload.build().toByteString())
+.build());
 pt.putInputs("input", getInputPCollection().getId());
-int i = 0;
-for (PCollectionNode materializedPCollection : getOutputPCollections()) {
-  pt.putOutputs(String.format("materialized_%s", i), 
materializedPCollection.getId());
-  i++;
-}
-for (PTransformNode fusedTransform : getTransforms()) {
-  pt.addSubtransforms(fusedTransform.getId());
+int outputIndex = 0;
+for (PCollectionNode pcNode : getOutputPCollections()) {
+  // Do something
+  pt.putOutputs(String.format("materialized_%d", outputIndex), 
pcNode.getId());
+  outputIndex++;
 }
-pt.setSpec(FunctionSpec.newBuilder().setUrn(ExecutableStage.URN));
 return pt.build();
   }
 
+  // TODO: Should this live under ExecutableStageTranslation?
 
 Review comment:
   Ack. I meant this to be a question to the reviewer rather than an actual 
checked-in TODO.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82077)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 13.5h
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82080=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82080
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 19/Mar/18 22:41
Start Date: 19/Mar/18 22:41
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on issue #4898: [BEAM-3565] Clean up 
ExecutableStage
URL: https://github.com/apache/beam/pull/4898#issuecomment-374408951
 
 
   R: @tgroh 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82080)
Time Spent: 13h 50m  (was: 13h 40m)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 13h 50m
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82078=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82078
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 19/Mar/18 22:41
Start Date: 19/Mar/18 22:41
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #4844: 
[BEAM-3565] Add ExecutableStagePayload to simplify runner stage reconstruction
URL: https://github.com/apache/beam/pull/4844#discussion_r175602123
 
 

 ##
 File path: model/pipeline/src/main/proto/beam_runner_api.proto
 ##
 @@ -203,6 +203,23 @@ message PCollection {
   DisplayData display_data = 5;
 }
 
+// The payload for an executable stage. This will eventually be passed to an 
SDK in the form of a
+// ProcessBundleDescriptor.
+message ExecutableStagePayload {
+
+  Environment environment = 1;
 
 Review comment:
   Added a comment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82078)
Time Spent: 13h 40m  (was: 13.5h)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 13h 40m
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82076=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82076
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 19/Mar/18 22:41
Start Date: 19/Mar/18 22:41
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #4844: 
[BEAM-3565] Add ExecutableStagePayload to simplify runner stage reconstruction
URL: https://github.com/apache/beam/pull/4844#discussion_r175602637
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
 ##
 @@ -84,64 +81,72 @@
* follows:
*
* 
-   *   The {@link PTransform#getSubtransformsList()} contains the result 
of {@link
-   *   #getTransforms()}.
+   *   The {@link PTransform#getSubtransformsList()} contains no 
subtransforms. This ensures
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82076)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 13.5h
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82074=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82074
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 19/Mar/18 22:40
Start Date: 19/Mar/18 22:40
Worklog Time Spent: 10m 
  Work Description: bsidhom opened a new pull request #4898: [BEAM-3565] 
Clean up ExecutableStage
URL: https://github.com/apache/beam/pull/4898
 
 
   Addressing minor issues from #4844.
   
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [ ] Make sure there is a [JIRA 
issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the 
change (usually before you start working on it).  Trivial changes like typos do 
not require a JIRA issue.  Your pull request should address just this issue, 
without pulling in other changes.
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue.
- [ ] Write a pull request description that is detailed enough to 
understand:
  - [ ] What the pull request does
  - [ ] Why it does it
  - [ ] How it does it
  - [ ] Why this approach
- [ ] Each commit in the pull request should have a meaningful subject line 
and body.
- [ ] Run `mvn clean verify` to make sure basic checks pass. A more 
thorough check will be performed on your pull request automatically.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82074)
Time Spent: 13h 20m  (was: 13h 10m)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 13h 20m
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82057=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82057
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 19/Mar/18 21:32
Start Date: 19/Mar/18 21:32
Worklog Time Spent: 10m 
  Work Description: tgroh closed pull request #4844: [BEAM-3565] Add 
ExecutableStagePayload to simplify runner stage reconstruction
URL: https://github.com/apache/beam/pull/4844
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto 
b/model/pipeline/src/main/proto/beam_runner_api.proto
index b45be09efb6..3ed90dd036e 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -203,6 +203,23 @@ message PCollection {
   DisplayData display_data = 5;
 }
 
+// The payload for an executable stage. This will eventually be passed to an 
SDK in the form of a
+// ProcessBundleDescriptor.
+message ExecutableStagePayload {
+
+  Environment environment = 1;
+
+  // Input PCollection id.
+  string input = 2;
+
+  // PTransform ids contained within this executable stage.
+  repeated string transforms = 3;
+
+  // Output PCollection ids.
+  repeated string outputs = 4;
+
+}
+
 // The payload for the primitive ParDo transform.
 message ParDoPayload {
 
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java
new file mode 100644
index 000..1200dc621a7
--- /dev/null
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java
@@ -0,0 +1,25 @@
+package org.apache.beam.runners.core.construction;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+
+/**
+ * Utilities for converting {@link ExecutableStage}s to and from {@link 
RunnerApi} protocol buffers.
+ */
+public class ExecutableStageTranslation {
+
+  /** Extracts an {@link ExecutableStagePayload} from the given transform. */
+  public static ExecutableStagePayload getExecutableStagePayload(
+  AppliedPTransform appliedTransform) throws IOException {
+RunnerApi.PTransform transform =
+PTransformTranslation.toProto(appliedTransform, 
SdkComponents.create());
+checkArgument(ExecutableStage.URN.equals(transform.getSpec().getUrn()));
+return ExecutableStagePayload.parseFrom(transform.getSpec().getPayload());
+  }
+
+}
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
index 766ce0d7136..27bfed87553 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
@@ -18,19 +18,16 @@
 
 package org.apache.beam.runners.core.construction.graph;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.collect.Iterables.getOnlyElement;
-
 import java.util.Collection;
-import java.util.Optional;
+import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload;
 import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
-import org.apache.beam.runners.core.construction.Environments;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 
@@ -84,64 +81,72 @@
* follows:
*
* 
-   *   The {@link PTransform#getSubtransformsList()} contains the result 
of {@link
-   *   #getTransforms()}.
+   *   The {@link 

[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-16 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=81441=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81441
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 16/Mar/18 23:57
Start Date: 16/Mar/18 23:57
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #4777: 
[BEAM-3565] Add FusedPipeline#toPipeline
URL: https://github.com/apache/beam/pull/4777#discussion_r175238617
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
 ##
 @@ -161,6 +179,40 @@ private static boolean isPrimitiveTransform(PTransform 
transform) {
 return network;
   }
 
+  // This enables a naive implementation of topological sort, instead of doing 
something clever.
+  // Nodes with lower weight precede nodes with higher weight, and are 
unrelated to nodes with
+  // equal weight
+  private final LoadingCache nodeWeights =
+  CacheBuilder.newBuilder()
+  .build(
+  new CacheLoader() {
+@Override
+public Long load(@Nonnull PTransformNode transformNode) {
+  long parentWeight = 0L;
+  for (String inputPCollectionId :
+  transformNode.getTransform().getInputsMap().values()) {
+PTransformNode upstream =
+getProducer(
+PipelineNode.pCollection(
+inputPCollectionId,
+
components.getPcollectionsOrThrow(inputPCollectionId)));
+parentWeight += nodeWeights.getUnchecked(upstream);
+  }
+  return 1 + parentWeight;
+}
+  });
+
+  public Iterable getTopologicallyOrderedTransforms() {
+return pipelineNetwork
+.nodes()
+.stream()
+.filter(node -> node instanceof PTransformNode)
+.map(PTransformNode.class::cast)
+.collect(
+Collectors.toCollection(
+() -> new 
TreeSet<>(Comparator.comparingLong(nodeWeights::getUnchecked;
 
 Review comment:
   This comparator is really extremely inconsistent with equals, and will make 
it so there only looks like there's one root transform.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81441)
Time Spent: 13h  (was: 12h 50m)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 13h
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-14 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=80630=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80630
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 14/Mar/18 23:28
Start Date: 14/Mar/18 23:28
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on issue #4777: [BEAM-3565] Add 
FusedPipeline#toPipeline
URL: https://github.com/apache/beam/pull/4777#issuecomment-373208733
 
 
   Please address all the comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 80630)
Time Spent: 12h 50m  (was: 12h 40m)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 12h 50m
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-12 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=79774=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79774
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 13/Mar/18 04:52
Start Date: 13/Mar/18 04:52
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #4777: 
[BEAM-3565] Add FusedPipeline#toPipeline
URL: https://github.com/apache/beam/pull/4777#discussion_r174016261
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
 ##
 @@ -61,12 +66,23 @@
* The returned {@link QueryablePipeline} will contain only the primitive 
transforms present
* within the provided components.
*/
-  public static QueryablePipeline fromComponents(Components components) {
+  public static QueryablePipeline forPrimitivesIn(Components components) {
+return forComponents(retainOnlyPrimitives(components));
+  }
+
+  /**
+   * Create a new {@link QueryablePipeline} based on the provided components.
+   *
+   * Relationships between composite transforms and their subtransforms, 
and producer
+   * relationships between {@link PTransformNode transforms} and {@link 
PCollectionNode
+   * PCollections} are not yet modelled by {@link QueryablePipeline}, so the 
input {@link
+   * Components} should be treatable as though each node is a primitive.
+   */
+  static QueryablePipeline forComponents(Components components) {
 
 Review comment:
   https://github.com/apache/beam/pull/4844 performs a lot of this change, 
without embedding the components within the stage; it will still require us to 
create a partial components, but should ultimately cause this construction to 
go through the same path as the original `queryablePipeline`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 79774)
Time Spent: 12h 40m  (was: 12.5h)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 12h 40m
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-12 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=79773=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79773
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 13/Mar/18 04:51
Start Date: 13/Mar/18 04:51
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #4777: 
[BEAM-3565] Add FusedPipeline#toPipeline
URL: https://github.com/apache/beam/pull/4777#discussion_r174016261
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
 ##
 @@ -61,12 +66,23 @@
* The returned {@link QueryablePipeline} will contain only the primitive 
transforms present
* within the provided components.
*/
-  public static QueryablePipeline fromComponents(Components components) {
+  public static QueryablePipeline forPrimitivesIn(Components components) {
+return forComponents(retainOnlyPrimitives(components));
+  }
+
+  /**
+   * Create a new {@link QueryablePipeline} based on the provided components.
+   *
+   * Relationships between composite transforms and their subtransforms, 
and producer
+   * relationships between {@link PTransformNode transforms} and {@link 
PCollectionNode
+   * PCollections} are not yet modelled by {@link QueryablePipeline}, so the 
input {@link
+   * Components} should be treatable as though each node is a primitive.
+   */
+  static QueryablePipeline forComponents(Components components) {
 
 Review comment:
   https://github.com/apache/beam/pull/4844 performs a lot of this change, 
without embedding the components within the stage.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 79773)
Time Spent: 12.5h  (was: 12h 20m)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 12.5h
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-12 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=79708=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79708
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 13/Mar/18 02:33
Start Date: 13/Mar/18 02:33
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #4844: 
[BEAM-3565] Add ExecutableStagePayload to simplify runner stage reconstruction
URL: https://github.com/apache/beam/pull/4844#discussion_r174001324
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
 ##
 @@ -84,64 +81,72 @@
* follows:
*
* 
-   *   The {@link PTransform#getSubtransformsList()} contains the result 
of {@link
-   *   #getTransforms()}.
+   *   The {@link PTransform#getSubtransformsList()} contains no 
subtransforms. This ensures
+   *   that executable stages are treated as primitive transforms.
*   The only {@link PCollection} in the {@link 
PTransform#getInputsMap()} is the result of
*   {@link #getInputPCollection()}.
*   The output {@link PCollection PCollections} in the values of {@link
*   PTransform#getOutputsMap()} are the {@link PCollectionNode 
PCollections} returned by
*   {@link #getOutputPCollections()}.
+   *   The {@link FunctionSpec} contains an {@link ExecutableStagePayload} 
which has its input
 
 Review comment:
   The `PTransform#getSpec()` contains an...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 79708)
Time Spent: 11h 50m  (was: 11h 40m)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 11h 50m
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-12 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=79713=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79713
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 13/Mar/18 02:33
Start Date: 13/Mar/18 02:33
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #4844: 
[BEAM-3565] Add ExecutableStagePayload to simplify runner stage reconstruction
URL: https://github.com/apache/beam/pull/4844#discussion_r174001198
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
 ##
 @@ -84,64 +81,72 @@
* follows:
*
* 
-   *   The {@link PTransform#getSubtransformsList()} contains the result 
of {@link
-   *   #getTransforms()}.
+   *   The {@link PTransform#getSubtransformsList()} contains no 
subtransforms. This ensures
 
 Review comment:
   s/contains no subtransforms/is empty/
   
   Just because it scans better


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 79713)
Time Spent: 12h 20m  (was: 12h 10m)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 12h 20m
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-12 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=79711=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79711
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 13/Mar/18 02:33
Start Date: 13/Mar/18 02:33
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #4844: 
[BEAM-3565] Add ExecutableStagePayload to simplify runner stage reconstruction
URL: https://github.com/apache/beam/pull/4844#discussion_r174001847
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
 ##
 @@ -84,64 +81,72 @@
* follows:
*
* 
-   *   The {@link PTransform#getSubtransformsList()} contains the result 
of {@link
-   *   #getTransforms()}.
+   *   The {@link PTransform#getSubtransformsList()} contains no 
subtransforms. This ensures
+   *   that executable stages are treated as primitive transforms.
*   The only {@link PCollection} in the {@link 
PTransform#getInputsMap()} is the result of
*   {@link #getInputPCollection()}.
*   The output {@link PCollection PCollections} in the values of {@link
*   PTransform#getOutputsMap()} are the {@link PCollectionNode 
PCollections} returned by
*   {@link #getOutputPCollections()}.
+   *   The {@link FunctionSpec} contains an {@link ExecutableStagePayload} 
which has its input
+   *   and output PCollections set to the same values as the outer 
PTransform itself. It further
+   *   contains the environment set of transforms for this stage.
* 
+   *
+   * The executable stage can be reconstructed from the resulting {@link 
ExecutableStagePayload}
+   * and components alone via {@link #fromPayload(ExecutableStagePayload, 
Components)}.
*/
   default PTransform toPTransform() {
+ExecutableStagePayload.Builder payload = 
ExecutableStagePayload.newBuilder();
+
+payload.setEnvironment(getEnvironment());
+
+PCollectionNode input = getInputPCollection();
+payload.setInput(input.getId());
+
+for (PTransformNode transform : getTransforms()) {
+  payload.addTransforms(transform.getId());
 
 Review comment:
   I'm not huge on the double iteration, given that I think you can populate 
both the payload and the transform builder in the same places to maximize 
mental locality (e.g. populate `payload.setInput` and then 
`pt.putInput("input", `)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 79711)
Time Spent: 12h 10m  (was: 12h)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 12h 10m
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-12 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=79712=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79712
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 13/Mar/18 02:33
Start Date: 13/Mar/18 02:33
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #4844: 
[BEAM-3565] Add ExecutableStagePayload to simplify runner stage reconstruction
URL: https://github.com/apache/beam/pull/4844#discussion_r174001680
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
 ##
 @@ -84,64 +81,72 @@
* follows:
*
* 
-   *   The {@link PTransform#getSubtransformsList()} contains the result 
of {@link
-   *   #getTransforms()}.
+   *   The {@link PTransform#getSubtransformsList()} contains no 
subtransforms. This ensures
+   *   that executable stages are treated as primitive transforms.
*   The only {@link PCollection} in the {@link 
PTransform#getInputsMap()} is the result of
*   {@link #getInputPCollection()}.
*   The output {@link PCollection PCollections} in the values of {@link
*   PTransform#getOutputsMap()} are the {@link PCollectionNode 
PCollections} returned by
*   {@link #getOutputPCollections()}.
+   *   The {@link FunctionSpec} contains an {@link ExecutableStagePayload} 
which has its input
+   *   and output PCollections set to the same values as the outer 
PTransform itself. It further
+   *   contains the environment set of transforms for this stage.
* 
+   *
+   * The executable stage can be reconstructed from the resulting {@link 
ExecutableStagePayload}
+   * and components alone via {@link #fromPayload(ExecutableStagePayload, 
Components)}.
*/
   default PTransform toPTransform() {
+ExecutableStagePayload.Builder payload = 
ExecutableStagePayload.newBuilder();
+
+payload.setEnvironment(getEnvironment());
+
+PCollectionNode input = getInputPCollection();
+payload.setInput(input.getId());
+
+for (PTransformNode transform : getTransforms()) {
+  payload.addTransforms(transform.getId());
+}
+
+for (PCollectionNode output : getOutputPCollections()) {
+  payload.addOutputs(output.getId());
+}
+
 PTransform.Builder pt = PTransform.newBuilder();
+pt.setSpec(FunctionSpec.newBuilder()
+.setUrn(ExecutableStage.URN)
+.setPayload(payload.build().toByteString())
+.build());
 pt.putInputs("input", getInputPCollection().getId());
-int i = 0;
-for (PCollectionNode materializedPCollection : getOutputPCollections()) {
-  pt.putOutputs(String.format("materialized_%s", i), 
materializedPCollection.getId());
-  i++;
-}
-for (PTransformNode fusedTransform : getTransforms()) {
-  pt.addSubtransforms(fusedTransform.getId());
+int outputIndex = 0;
+for (PCollectionNode pcNode : getOutputPCollections()) {
+  // Do something
+  pt.putOutputs(String.format("materialized_%d", outputIndex), 
pcNode.getId());
+  outputIndex++;
 }
-pt.setSpec(FunctionSpec.newBuilder().setUrn(ExecutableStage.URN));
 return pt.build();
   }
 
+  // TODO: Should this live under ExecutableStageTranslation?
 
 Review comment:
   Regardless of if it should or shouldn't, you should have a JIRA to determine 
it.
   
   For what it's worth, I think the toProto and fromProto methods should be 
coresident, and I think this is a totally reasonable place for them (as it's 
already involved with the proto representation of the pipeline, so we don't get 
significant separability between the java and proto representations.)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 79712)
Time Spent: 12h 10m  (was: 12h)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 12h 10m
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some 

[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-12 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=79710=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79710
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 13/Mar/18 02:33
Start Date: 13/Mar/18 02:33
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #4844: 
[BEAM-3565] Add ExecutableStagePayload to simplify runner stage reconstruction
URL: https://github.com/apache/beam/pull/4844#discussion_r174001443
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
 ##
 @@ -84,64 +81,72 @@
* follows:
*
* 
-   *   The {@link PTransform#getSubtransformsList()} contains the result 
of {@link
-   *   #getTransforms()}.
+   *   The {@link PTransform#getSubtransformsList()} contains no 
subtransforms. This ensures
+   *   that executable stages are treated as primitive transforms.
*   The only {@link PCollection} in the {@link 
PTransform#getInputsMap()} is the result of
*   {@link #getInputPCollection()}.
*   The output {@link PCollection PCollections} in the values of {@link
*   PTransform#getOutputsMap()} are the {@link PCollectionNode 
PCollections} returned by
*   {@link #getOutputPCollections()}.
+   *   The {@link FunctionSpec} contains an {@link ExecutableStagePayload} 
which has its input
+   *   and output PCollections set to the same values as the outer 
PTransform itself. It further
+   *   contains the environment set of transforms for this stage.
* 
+   *
+   * The executable stage can be reconstructed from the resulting {@link 
ExecutableStagePayload}
+   * and components alone via {@link #fromPayload(ExecutableStagePayload, 
Components)}.
*/
   default PTransform toPTransform() {
+ExecutableStagePayload.Builder payload = 
ExecutableStagePayload.newBuilder();
+
+payload.setEnvironment(getEnvironment());
+
+PCollectionNode input = getInputPCollection();
+payload.setInput(input.getId());
+
+for (PTransformNode transform : getTransforms()) {
+  payload.addTransforms(transform.getId());
+}
+
+for (PCollectionNode output : getOutputPCollections()) {
+  payload.addOutputs(output.getId());
+}
+
 PTransform.Builder pt = PTransform.newBuilder();
+pt.setSpec(FunctionSpec.newBuilder()
+.setUrn(ExecutableStage.URN)
+.setPayload(payload.build().toByteString())
+.build());
 pt.putInputs("input", getInputPCollection().getId());
-int i = 0;
-for (PCollectionNode materializedPCollection : getOutputPCollections()) {
-  pt.putOutputs(String.format("materialized_%s", i), 
materializedPCollection.getId());
-  i++;
-}
-for (PTransformNode fusedTransform : getTransforms()) {
-  pt.addSubtransforms(fusedTransform.getId());
+int outputIndex = 0;
+for (PCollectionNode pcNode : getOutputPCollections()) {
+  // Do something
 
 Review comment:
   What's this comment?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 79710)
Time Spent: 12h  (was: 11h 50m)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 12h
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-12 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=79709=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79709
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 13/Mar/18 02:33
Start Date: 13/Mar/18 02:33
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #4844: 
[BEAM-3565] Add ExecutableStagePayload to simplify runner stage reconstruction
URL: https://github.com/apache/beam/pull/4844#discussion_r174001107
 
 

 ##
 File path: model/pipeline/src/main/proto/beam_runner_api.proto
 ##
 @@ -203,6 +203,23 @@ message PCollection {
   DisplayData display_data = 5;
 }
 
+// The payload for an executable stage. This will eventually be passed to an 
SDK in the form of a
+// ProcessBundleDescriptor.
+message ExecutableStagePayload {
+
+  Environment environment = 1;
 
 Review comment:
   I'd like a comment explaining why this is a value rather than an ID, or 
replace it with `string environment_id`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 79709)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 11h 50m
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-09 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=79027=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79027
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 09/Mar/18 21:34
Start Date: 09/Mar/18 21:34
Worklog Time Spent: 10m 
  Work Description: bsidhom opened a new pull request #4844: [BEAM-3565] 
Add ExecutableStagePayload to make aid runner stage reconstruction
URL: https://github.com/apache/beam/pull/4844
 
 
   Runner code generally only has access to the root pipeline and 
`AppliedPTransform`s while walking the pipeline to perform translation. This 
change adds a new `ExecutableStagePayload` message which is embedded in the 
`FunctionSpec` of serialized `ExecutableStage` `PTransform`s. Doing so avoids 
the extra pipeline munging necessary to access to proper set of subtransforms.
   
   It also means that subtransforms are _not_ added directly to 
`ExecutableStage` transforms. This effectively makes executable stages native 
transforms, which they should be as far as the runner is concerned.
   
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [ ] Make sure there is a [JIRA 
issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the 
change (usually before you start working on it).  Trivial changes like typos do 
not require a JIRA issue.  Your pull request should address just this issue, 
without pulling in other changes.
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue.
- [ ] Write a pull request description that is detailed enough to 
understand:
  - [ ] What the pull request does
  - [ ] Why it does it
  - [ ] How it does it
  - [ ] Why this approach
- [ ] Each commit in the pull request should have a meaningful subject line 
and body.
- [ ] Run `mvn clean verify` to make sure basic checks pass. A more 
thorough check will be performed on your pull request automatically.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 79027)
Time Spent: 11.5h  (was: 11h 20m)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 11.5h
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3565) Add utilities for producing a collection of PTransforms that can execute in a single SDK Harness

2018-03-09 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=79028=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79028
 ]

ASF GitHub Bot logged work on BEAM-3565:


Author: ASF GitHub Bot
Created on: 09/Mar/18 21:34
Start Date: 09/Mar/18 21:34
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on issue #4844: [BEAM-3565] Add 
ExecutableStagePayload to make aid runner stage reconstruction
URL: https://github.com/apache/beam/pull/4844#issuecomment-371951519
 
 
   R: @tgroh 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 79028)
Time Spent: 11h 40m  (was: 11.5h)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> 
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
> Fix For: 2.4.0
>
>  Time Spent: 11h 40m
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)