[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-30 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=107345=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-107345
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 30/May/18 19:48
Start Date: 30/May/18 19:48
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5349: 
[BEAM-3326] Remote stage evaluator
URL: https://github.com/apache/beam/pull/5349#discussion_r191900128
 
 

 ##
 File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/InProcessEnvironmentFactory.java
 ##
 @@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.beam.fn.harness.FnHarness;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.control.ControlClientPool;
+import 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import org.apache.beam.sdk.fn.stream.StreamObserverFactory;
+import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
+/**
+ * An {@link EnvironmentFactory} that communicates to a {@link FnHarness} via 
the in-process gRPC
+ * channel.
+ *
+ * TODO: Move this class to the runners/java-fn-execution module, with the 
Java SDK Harness as a
+ * provided dependency.
+ */
+class InProcessEnvironmentFactory implements EnvironmentFactory {
+
+  private final GrpcFnServer loggingServer;
+  private final GrpcFnServer controlServer;
+
+  private final ControlClientPool.Source clientSource;
+
+  InProcessEnvironmentFactory(
+  GrpcFnServer loggingServer,
+  GrpcFnServer controlServer,
+  ControlClientPool.Source clientSource) {
+this.loggingServer = loggingServer;
+this.controlServer = controlServer;
+checkArgument(
+loggingServer.getApiServiceDescriptor() != null,
+"Logging Server cannot have a null %s",
+ApiServiceDescriptor.class.getSimpleName());
+checkArgument(
+controlServer.getApiServiceDescriptor() != null,
+"Control Server cannot have a null %s",
+ApiServiceDescriptor.class.getSimpleName());
+this.clientSource = clientSource;
+  }
+
+  @Override
+  public RemoteEnvironment createEnvironment(Environment container) throws 
Exception {
+ExecutorService executor = Executors.newSingleThreadExecutor();
+Future fnHarness =
+executor.submit(
+() ->
+FnHarness.main(
+PipelineOptionsFactory.create(),
+loggingServer.getApiServiceDescriptor(),
+controlServer.getApiServiceDescriptor(),
+InProcessManagedChannelFactory.create(),
+StreamObserverFactory.direct()));
+executor.submit(
+() -> {
+  try {
+fnHarness.get();
+  } catch (Throwable t) {
+executor.shutdownNow();
+  }
+});
+
+// TODO: find some way to populate the actual ID in FnHarness.main()
 
 Review comment:
   +1, we should enhance `ApiServiceDescriptor` to hold connection information. 
I am in favor of adding `headers` in `ApiServiceDescriptor` instead of having a 
`client_id`.


This is an automated message from the Apache Git Service.
To 

[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-24 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=105653=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105653
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 24/May/18 16:56
Start Date: 24/May/18 16:56
Worklog Time Spent: 10m 
  Work Description: tgroh closed pull request #5349: [BEAM-3326] Remote 
stage evaluator
URL: https://github.com/apache/beam/pull/5349
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runners/direct-java/build.gradle b/runners/direct-java/build.gradle
index 5fc4e159520..af99b5b5bc2 100644
--- a/runners/direct-java/build.gradle
+++ b/runners/direct-java/build.gradle
@@ -64,6 +64,7 @@ dependencies {
   shadow library.java.slf4j_api
   shadow library.java.hamcrest_core
   shadow library.java.junit
+  testRuntime project(path: ":beam-sdks-java-harness")
   shadowTest project(path: ":beam-sdks-java-core", configuration: "shadowTest")
   shadowTest project(path: ":beam-runners-core-java", configuration: 
"shadowTest")
   shadowTest library.java.guava_testlib
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index 1d3d790a747..d06b074246c 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -226,6 +226,12 @@
   beam-sdks-java-fn-execution
 
 
+
+  org.apache.beam
+  beam-sdks-java-harness
+  provided
+
+
 
   org.apache.beam
   beam-runners-java-fn-execution
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory.java
new file mode 100644
index 000..f34c4d54d49
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
+import org.apache.beam.runners.fnexecution.control.RemoteBundle;
+import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * The {@link TransformEvaluatorFactory} which produces {@link 
TransformEvaluator evaluators} for
+ * stages which execute on an SDK harness via the Fn Execution APIs.
+ */
+class RemoteStageEvaluatorFactory implements TransformEvaluatorFactory {
+  private final BundleFactory bundleFactory;
+
+  private final JobBundleFactory jobFactory;
+
+  RemoteStageEvaluatorFactory(BundleFactory bundleFactory, JobBundleFactory 
jobFactory) {
+this.bundleFactory = bundleFactory;
+this.jobFactory = jobFactory;
+  }
+
+  @Nullable
+  @Override
+  public  TransformEvaluator forApplication(
+  PTransformNode application, CommittedBundle inputBundle) throws 
Exception {
+return new RemoteStageEvaluator<>(application);
+  }
+
+  @Override
+  public void cleanup() throws Exception {
+jobFactory.close();
+  }
+
+  private class RemoteStageEvaluator implements TransformEvaluator {
+private final PTransformNode transform;
+private final RemoteBundle bundle;
+private final Collection outputs;
+
+private RemoteStageEvaluator(PTransformNode transform) throws Exception {
+  this.transform = transform;
+  ExecutableStage stage =
+  ExecutableStage.fromPayload(
+ 

[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-24 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=105616=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105616
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 24/May/18 15:15
Start Date: 24/May/18 15:15
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5349: 
[BEAM-3326] Remote stage evaluator
URL: https://github.com/apache/beam/pull/5349#discussion_r190624823
 
 

 ##
 File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/InProcessEnvironmentFactory.java
 ##
 @@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.beam.fn.harness.FnHarness;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.control.ControlClientPool;
+import 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import org.apache.beam.sdk.fn.stream.StreamObserverFactory;
+import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
+/**
+ * An {@link EnvironmentFactory} that communicates to a {@link FnHarness} via 
the in-process gRPC
+ * channel.
+ *
+ * TODO: Move this class to the runners/java-fn-execution module, with the 
Java SDK Harness as a
+ * provided dependency.
+ */
+class InProcessEnvironmentFactory implements EnvironmentFactory {
+
+  private final GrpcFnServer loggingServer;
+  private final GrpcFnServer controlServer;
+
+  private final ControlClientPool.Source clientSource;
+
+  InProcessEnvironmentFactory(
+  GrpcFnServer loggingServer,
+  GrpcFnServer controlServer,
+  ControlClientPool.Source clientSource) {
+this.loggingServer = loggingServer;
+this.controlServer = controlServer;
+checkArgument(
+loggingServer.getApiServiceDescriptor() != null,
+"Logging Server cannot have a null %s",
+ApiServiceDescriptor.class.getSimpleName());
+checkArgument(
+controlServer.getApiServiceDescriptor() != null,
+"Control Server cannot have a null %s",
+ApiServiceDescriptor.class.getSimpleName());
+this.clientSource = clientSource;
+  }
+
+  @Override
+  public RemoteEnvironment createEnvironment(Environment container) throws 
Exception {
+ExecutorService executor = Executors.newSingleThreadExecutor();
+Future fnHarness =
+executor.submit(
+() ->
+FnHarness.main(
+PipelineOptionsFactory.create(),
+loggingServer.getApiServiceDescriptor(),
+controlServer.getApiServiceDescriptor(),
+InProcessManagedChannelFactory.create(),
+StreamObserverFactory.direct()));
+executor.submit(
+() -> {
+  try {
+fnHarness.get();
+  } catch (Throwable t) {
+executor.shutdownNow();
+  }
+});
+
+// TODO: find some way to populate the actual ID in FnHarness.main()
 
 Review comment:
   I would be in favor of dropping how we currently provide the client id in 
exchange for this approach unless your referring to something else when you say 
`the same value in two different ways`


This is an automated message from the Apache 

[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=105413=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105413
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 23/May/18 23:47
Start Date: 23/May/18 23:47
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #5349: 
[BEAM-3326] Remote stage evaluator
URL: https://github.com/apache/beam/pull/5349#discussion_r190431619
 
 

 ##
 File path: runners/java-fn-execution/build.gradle
 ##
 @@ -24,6 +24,7 @@ dependencies {
   compile library.java.guava
   compile library.java.findbugs_annotations
   compile project(path: ":beam-runners-core-construction-java", configuration: 
"shadow")
+  compileOnly project(path: ":beam-sdks-java-harness")
 
 Review comment:
   Done.
   
   I can't find any documentation at all about the `provided` dependency scope 
in Gradle - every time I google it it sends me to `compileOnly`. This is a 
result of our `build_rules.gradle`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 105413)
Time Spent: 15h 20m  (was: 15h 10m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 15h 20m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=105406=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105406
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 23/May/18 23:20
Start Date: 23/May/18 23:20
Worklog Time Spent: 10m 
  Work Description: herohde commented on a change in pull request #5349: 
[BEAM-3326] Remote stage evaluator
URL: https://github.com/apache/beam/pull/5349#discussion_r190427412
 
 

 ##
 File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/InProcessEnvironmentFactory.java
 ##
 @@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.beam.fn.harness.FnHarness;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.control.ControlClientPool;
+import 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import org.apache.beam.sdk.fn.stream.StreamObserverFactory;
+import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
+/**
+ * An {@link EnvironmentFactory} that communicates to a {@link FnHarness} via 
the in-process gRPC
+ * channel.
+ *
+ * TODO: Move this class to the runners/java-fn-execution module, with the 
Java SDK Harness as a
+ * provided dependency.
+ */
+class InProcessEnvironmentFactory implements EnvironmentFactory {
+
+  private final GrpcFnServer loggingServer;
+  private final GrpcFnServer controlServer;
+
+  private final ControlClientPool.Source clientSource;
+
+  InProcessEnvironmentFactory(
+  GrpcFnServer loggingServer,
+  GrpcFnServer controlServer,
+  ControlClientPool.Source clientSource) {
+this.loggingServer = loggingServer;
+this.controlServer = controlServer;
+checkArgument(
+loggingServer.getApiServiceDescriptor() != null,
+"Logging Server cannot have a null %s",
+ApiServiceDescriptor.class.getSimpleName());
+checkArgument(
+controlServer.getApiServiceDescriptor() != null,
+"Control Server cannot have a null %s",
+ApiServiceDescriptor.class.getSimpleName());
+this.clientSource = clientSource;
+  }
+
+  @Override
+  public RemoteEnvironment createEnvironment(Environment container) throws 
Exception {
+ExecutorService executor = Executors.newSingleThreadExecutor();
+Future fnHarness =
+executor.submit(
+() ->
+FnHarness.main(
+PipelineOptionsFactory.create(),
+loggingServer.getApiServiceDescriptor(),
+controlServer.getApiServiceDescriptor(),
+InProcessManagedChannelFactory.create(),
+StreamObserverFactory.direct()));
+executor.submit(
+() -> {
+  try {
+fnHarness.get();
+  } catch (Throwable t) {
+executor.shutdownNow();
+  }
+});
+
+// TODO: find some way to populate the actual ID in FnHarness.main()
 
 Review comment:
   I see what you're saying. It makes sense, but I'm not in favor of providing 
the same value in two different ways.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL 

[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=105403=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105403
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 23/May/18 23:11
Start Date: 23/May/18 23:11
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5349: 
[BEAM-3326] Remote stage evaluator
URL: https://github.com/apache/beam/pull/5349#discussion_r190425499
 
 

 ##
 File path: runners/direct-java/build.gradle
 ##
 @@ -64,6 +64,7 @@ dependencies {
   shadow library.java.slf4j_api
 
 Review comment:
   I still think we want to use `provided` for `beam-sdks-java-harness` so that 
it is correctly added to the generated maven pom.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 105403)
Time Spent: 14h 50m  (was: 14h 40m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 14h 50m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=105404=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105404
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 23/May/18 23:11
Start Date: 23/May/18 23:11
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5349: 
[BEAM-3326] Remote stage evaluator
URL: https://github.com/apache/beam/pull/5349#discussion_r190425600
 
 

 ##
 File path: 
runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java
 ##
 @@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import static com.google.common.collect.Iterables.getOnlyElement;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.Iterables;
+import java.io.Serializable;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.InProcessServerFactory;
+import 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
+import org.apache.beam.runners.fnexecution.data.GrpcDataService;
+import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter;
+import org.apache.beam.runners.fnexecution.state.GrpcStateService;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.joda.time.Instant;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link RemoteStageEvaluatorFactory}. */
+@RunWith(JUnit4.class)
+public class RemoteStageEvaluatorFactoryTest implements Serializable {
+
+  private transient RemoteStageEvaluatorFactory factory;
+  private transient ExecutorService executor;
+  private transient GrpcFnServer dataServer;
 
 Review comment:
   Got it


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 105404)
Time Spent: 14h 50m  (was: 14h 40m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>

[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=105405=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105405
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 23/May/18 23:11
Start Date: 23/May/18 23:11
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5349: 
[BEAM-3326] Remote stage evaluator
URL: https://github.com/apache/beam/pull/5349#discussion_r190425759
 
 

 ##
 File path: runners/java-fn-execution/build.gradle
 ##
 @@ -24,6 +24,7 @@ dependencies {
   compile library.java.guava
   compile library.java.findbugs_annotations
   compile project(path: ":beam-runners-core-construction-java", configuration: 
"shadow")
+  compileOnly project(path: ":beam-sdks-java-harness")
 
 Review comment:
   Please use `provided`
   
   Don't we need a corresponding change to the pom.xml?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 105405)
Time Spent: 15h  (was: 14h 50m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 15h
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=105402=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105402
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 23/May/18 23:06
Start Date: 23/May/18 23:06
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5349: 
[BEAM-3326] Remote stage evaluator
URL: https://github.com/apache/beam/pull/5349#discussion_r190425270
 
 

 ##
 File path: runners/direct-java/build.gradle
 ##
 @@ -58,12 +58,14 @@ dependencies {
   compile project(path: ":beam-runners-local-java-core", configuration: 
"shadow")
   compile project(path: ":beam-runners-java-fn-execution", configuration: 
"shadow")
   compile project(path: ":beam-sdks-java-fn-execution", configuration: 
"shadow")
+  compileOnly project(path: ":beam-sdks-java-harness")
 
 Review comment:
   We have been using `provided` scope in other places and this gets correctly 
translated when producing the maven jar.
   
   Your correct in that we should migrate to `compileOnly` as the provided 
scope but until that happens please stick with `provided` for now. I filed 
https://issues.apache.org/jira/browse/BEAM-4395 for the migration to 
`compileOnly`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 105402)
Time Spent: 14h 40m  (was: 14.5h)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 14h 40m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=105400=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105400
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 23/May/18 23:01
Start Date: 23/May/18 23:01
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5349: 
[BEAM-3326] Remote stage evaluator
URL: https://github.com/apache/beam/pull/5349#discussion_r190424525
 
 

 ##
 File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/InProcessEnvironmentFactory.java
 ##
 @@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.beam.fn.harness.FnHarness;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.control.ControlClientPool;
+import 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import org.apache.beam.sdk.fn.stream.StreamObserverFactory;
+import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
+/**
+ * An {@link EnvironmentFactory} that communicates to a {@link FnHarness} via 
the in-process gRPC
+ * channel.
+ *
+ * TODO: Move this class to the runners/java-fn-execution module, with the 
Java SDK Harness as a
+ * provided dependency.
+ */
+class InProcessEnvironmentFactory implements EnvironmentFactory {
+
+  private final GrpcFnServer loggingServer;
+  private final GrpcFnServer controlServer;
+
+  private final ControlClientPool.Source clientSource;
+
+  InProcessEnvironmentFactory(
+  GrpcFnServer loggingServer,
+  GrpcFnServer controlServer,
+  ControlClientPool.Source clientSource) {
+this.loggingServer = loggingServer;
+this.controlServer = controlServer;
+checkArgument(
+loggingServer.getApiServiceDescriptor() != null,
+"Logging Server cannot have a null %s",
+ApiServiceDescriptor.class.getSimpleName());
+checkArgument(
+controlServer.getApiServiceDescriptor() != null,
+"Control Server cannot have a null %s",
+ApiServiceDescriptor.class.getSimpleName());
+this.clientSource = clientSource;
+  }
+
+  @Override
+  public RemoteEnvironment createEnvironment(Environment container) throws 
Exception {
+ExecutorService executor = Executors.newSingleThreadExecutor();
+Future fnHarness =
+executor.submit(
+() ->
+FnHarness.main(
+PipelineOptionsFactory.create(),
+loggingServer.getApiServiceDescriptor(),
+controlServer.getApiServiceDescriptor(),
+InProcessManagedChannelFactory.create(),
+StreamObserverFactory.direct()));
+executor.submit(
+() -> {
+  try {
+fnHarness.get();
+  } catch (Throwable t) {
+executor.shutdownNow();
+  }
+});
+
+// TODO: find some way to populate the actual ID in FnHarness.main()
 
 Review comment:
   The point I'm trying to make is that the ApiServiceDescriptor is meant to 
describe who and how your connecting to an endpoint. I can foresee that in the 
future you would specify things like credentials inside this 
ApiServiceDescriptor. Since talking to the service requires a client id, I felt 
as 

[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-22 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104927=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104927
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 23/May/18 01:47
Start Date: 23/May/18 01:47
Worklog Time Spent: 10m 
  Work Description: herohde commented on a change in pull request #5349: 
[BEAM-3326] Remote stage evaluator
URL: https://github.com/apache/beam/pull/5349#discussion_r190102635
 
 

 ##
 File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/InProcessEnvironmentFactory.java
 ##
 @@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.beam.fn.harness.FnHarness;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.control.ControlClientPool;
+import 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import org.apache.beam.sdk.fn.stream.StreamObserverFactory;
+import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
+/**
+ * An {@link EnvironmentFactory} that communicates to a {@link FnHarness} via 
the in-process gRPC
+ * channel.
+ *
+ * TODO: Move this class to the runners/java-fn-execution module, with the 
Java SDK Harness as a
+ * provided dependency.
+ */
+class InProcessEnvironmentFactory implements EnvironmentFactory {
+
+  private final GrpcFnServer loggingServer;
+  private final GrpcFnServer controlServer;
+
+  private final ControlClientPool.Source clientSource;
+
+  InProcessEnvironmentFactory(
+  GrpcFnServer loggingServer,
+  GrpcFnServer controlServer,
+  ControlClientPool.Source clientSource) {
+this.loggingServer = loggingServer;
+this.controlServer = controlServer;
+checkArgument(
+loggingServer.getApiServiceDescriptor() != null,
+"Logging Server cannot have a null %s",
+ApiServiceDescriptor.class.getSimpleName());
+checkArgument(
+controlServer.getApiServiceDescriptor() != null,
+"Control Server cannot have a null %s",
+ApiServiceDescriptor.class.getSimpleName());
+this.clientSource = clientSource;
+  }
+
+  @Override
+  public RemoteEnvironment createEnvironment(Environment container) throws 
Exception {
+ExecutorService executor = Executors.newSingleThreadExecutor();
+Future fnHarness =
+executor.submit(
+() ->
+FnHarness.main(
+PipelineOptionsFactory.create(),
+loggingServer.getApiServiceDescriptor(),
+controlServer.getApiServiceDescriptor(),
+InProcessManagedChannelFactory.create(),
+StreamObserverFactory.direct()));
+executor.submit(
+() -> {
+  try {
+fnHarness.get();
+  } catch (Throwable t) {
+executor.shutdownNow();
+  }
+});
+
+// TODO: find some way to populate the actual ID in FnHarness.main()
 
 Review comment:
   I'm not sure I understand the context for this question. This ID is passed 
as part of the container contract and used for all portable API calls from that 
container. I think it's desirable that it's a single ID for simplicity and 
debuggability. Python's use of an environment variable to pass it around 

[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-22 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104907=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104907
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 23/May/18 00:41
Start Date: 23/May/18 00:41
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #5349: 
[BEAM-3326] Remote stage evaluator
URL: https://github.com/apache/beam/pull/5349#discussion_r190094767
 
 

 ##
 File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/InProcessEnvironmentFactory.java
 ##
 @@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.beam.fn.harness.FnHarness;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.control.ControlClientPool;
+import 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import org.apache.beam.sdk.fn.stream.StreamObserverFactory;
+import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
+/**
+ * An {@link EnvironmentFactory} that communicates to a {@link FnHarness} via 
the in-process gRPC
 
 Review comment:
   Updated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 104907)
Time Spent: 14h  (was: 13h 50m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 14h
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-22 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104906=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104906
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 23/May/18 00:41
Start Date: 23/May/18 00:41
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #5349: 
[BEAM-3326] Remote stage evaluator
URL: https://github.com/apache/beam/pull/5349#discussion_r190094759
 
 

 ##
 File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/InProcessEnvironmentFactory.java
 ##
 @@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.beam.fn.harness.FnHarness;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.control.ControlClientPool;
+import 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import org.apache.beam.sdk.fn.stream.StreamObserverFactory;
+import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
+/**
+ * An {@link EnvironmentFactory} that communicates to a {@link FnHarness} via 
the in-process gRPC
+ * channel.
+ *
+ * TODO: Move this class to the runners/java-fn-execution module, with the 
Java SDK Harness as a
+ * provided dependency.
+ */
+class InProcessEnvironmentFactory implements EnvironmentFactory {
+
+  private final GrpcFnServer loggingServer;
+  private final GrpcFnServer controlServer;
+
+  private final ControlClientPool.Source clientSource;
+
+  InProcessEnvironmentFactory(
+  GrpcFnServer loggingServer,
+  GrpcFnServer controlServer,
+  ControlClientPool.Source clientSource) {
+this.loggingServer = loggingServer;
+this.controlServer = controlServer;
+checkArgument(
+loggingServer.getApiServiceDescriptor() != null,
+"Logging Server cannot have a null %s",
+ApiServiceDescriptor.class.getSimpleName());
+checkArgument(
+controlServer.getApiServiceDescriptor() != null,
+"Control Server cannot have a null %s",
+ApiServiceDescriptor.class.getSimpleName());
+this.clientSource = clientSource;
+  }
+
+  @Override
+  public RemoteEnvironment createEnvironment(Environment container) throws 
Exception {
+ExecutorService executor = Executors.newSingleThreadExecutor();
+Future fnHarness =
+executor.submit(
+() ->
+FnHarness.main(
+PipelineOptionsFactory.create(),
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 104906)
Time Spent: 13h 50m  (was: 13h 40m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: 

[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-22 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104908=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104908
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 23/May/18 00:41
Start Date: 23/May/18 00:41
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #5349: 
[BEAM-3326] Remote stage evaluator
URL: https://github.com/apache/beam/pull/5349#discussion_r190094789
 
 

 ##
 File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/InProcessEnvironmentFactory.java
 ##
 @@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.beam.fn.harness.FnHarness;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.control.ControlClientPool;
+import 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import org.apache.beam.sdk.fn.stream.StreamObserverFactory;
+import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
+/**
+ * An {@link EnvironmentFactory} that communicates to a {@link FnHarness} via 
the in-process gRPC
+ * channel.
+ *
+ * TODO: Move this class to the runners/java-fn-execution module, with the 
Java SDK Harness as a
 
 Review comment:
   Nope, done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 104908)
Time Spent: 14h 10m  (was: 14h)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 14h 10m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-22 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104905=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104905
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 23/May/18 00:40
Start Date: 23/May/18 00:40
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #5349: 
[BEAM-3326] Remote stage evaluator
URL: https://github.com/apache/beam/pull/5349#discussion_r190094746
 
 

 ##
 File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/InProcessEnvironmentFactory.java
 ##
 @@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.beam.fn.harness.FnHarness;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.control.ControlClientPool;
+import 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import org.apache.beam.sdk.fn.stream.StreamObserverFactory;
+import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
+/**
+ * An {@link EnvironmentFactory} that communicates to a {@link FnHarness} via 
the in-process gRPC
+ * channel.
+ *
+ * TODO: Move this class to the runners/java-fn-execution module, with the 
Java SDK Harness as a
+ * provided dependency.
+ */
+class InProcessEnvironmentFactory implements EnvironmentFactory {
+
+  private final GrpcFnServer loggingServer;
+  private final GrpcFnServer controlServer;
+
+  private final ControlClientPool.Source clientSource;
+
+  InProcessEnvironmentFactory(
+  GrpcFnServer loggingServer,
+  GrpcFnServer controlServer,
+  ControlClientPool.Source clientSource) {
+this.loggingServer = loggingServer;
+this.controlServer = controlServer;
+checkArgument(
+loggingServer.getApiServiceDescriptor() != null,
+"Logging Server cannot have a null %s",
+ApiServiceDescriptor.class.getSimpleName());
+checkArgument(
+controlServer.getApiServiceDescriptor() != null,
+"Control Server cannot have a null %s",
+ApiServiceDescriptor.class.getSimpleName());
+this.clientSource = clientSource;
+  }
+
+  @Override
+  public RemoteEnvironment createEnvironment(Environment container) throws 
Exception {
+ExecutorService executor = Executors.newSingleThreadExecutor();
+Future fnHarness =
+executor.submit(
+() ->
+FnHarness.main(
 
 Review comment:
   I've added BEAM-4384;
   
   I don't think this is particularly high priority - I don't expect most users 
to use this directly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 104905)
Time Spent: 13h 40m  (was: 13.5h)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New 

[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-22 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104901=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104901
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 23/May/18 00:31
Start Date: 23/May/18 00:31
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #5349: 
[BEAM-3326] Remote stage evaluator
URL: https://github.com/apache/beam/pull/5349#discussion_r190093623
 
 

 ##
 File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/InProcessEnvironmentFactory.java
 ##
 @@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.beam.fn.harness.FnHarness;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.control.ControlClientPool;
+import 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import org.apache.beam.sdk.fn.stream.StreamObserverFactory;
+import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
+/**
+ * An {@link EnvironmentFactory} that communicates to a {@link FnHarness} via 
the in-process gRPC
+ * channel.
+ *
+ * TODO: Move this class to the runners/java-fn-execution module, with the 
Java SDK Harness as a
+ * provided dependency.
+ */
+class InProcessEnvironmentFactory implements EnvironmentFactory {
+
+  private final GrpcFnServer loggingServer;
+  private final GrpcFnServer controlServer;
+
+  private final ControlClientPool.Source clientSource;
+
+  InProcessEnvironmentFactory(
 
 Review comment:
   It's fine, but also gets us very little. Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 104901)
Time Spent: 13.5h  (was: 13h 20m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 13.5h
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-22 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104900=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104900
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 23/May/18 00:31
Start Date: 23/May/18 00:31
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #5349: 
[BEAM-3326] Remote stage evaluator
URL: https://github.com/apache/beam/pull/5349#discussion_r190093615
 
 

 ##
 File path: 
runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java
 ##
 @@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import static com.google.common.collect.Iterables.getOnlyElement;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.Iterables;
+import java.io.Serializable;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.InProcessServerFactory;
+import 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
+import org.apache.beam.runners.fnexecution.data.GrpcDataService;
+import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter;
+import org.apache.beam.runners.fnexecution.state.GrpcStateService;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.joda.time.Instant;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link RemoteStageEvaluatorFactory}. */
+@RunWith(JUnit4.class)
+public class RemoteStageEvaluatorFactoryTest implements Serializable {
+
+  private transient RemoteStageEvaluatorFactory factory;
+  private transient ExecutorService executor;
+  private transient GrpcFnServer dataServer;
 
 Review comment:
   We can't - we don't create the harness ourselves, we rely on the 
`EnvironmentFactory` to do so.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 104900)
Time Spent: 13h 20m  (was: 13h 10m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: 

[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-22 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104889=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104889
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 23/May/18 00:26
Start Date: 23/May/18 00:26
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #5349: 
[BEAM-3326] Remote stage evaluator
URL: https://github.com/apache/beam/pull/5349#discussion_r190092945
 
 

 ##
 File path: runners/direct-java/build.gradle
 ##
 @@ -58,12 +58,14 @@ dependencies {
   compile project(path: ":beam-runners-local-java-core", configuration: 
"shadow")
   compile project(path: ":beam-runners-java-fn-execution", configuration: 
"shadow")
   compile project(path: ":beam-sdks-java-fn-execution", configuration: 
"shadow")
+  compileOnly project(path: ":beam-sdks-java-harness")
   shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
   shadow library.java.joda_time
   shadow library.java.findbugs_jsr305
   shadow library.java.slf4j_api
   shadow library.java.hamcrest_core
   shadow library.java.junit
+  testCompile project(path: ":beam-sdks-java-harness")
 
 Review comment:
   Now it's required because the `compileOnly` dependency is in a different 
project.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 104889)
Time Spent: 13h 10m  (was: 13h)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 13h 10m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-22 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104886=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104886
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 23/May/18 00:11
Start Date: 23/May/18 00:11
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #5349: 
[BEAM-3326] Remote stage evaluator
URL: https://github.com/apache/beam/pull/5349#discussion_r190090893
 
 

 ##
 File path: runners/direct-java/build.gradle
 ##
 @@ -58,12 +58,14 @@ dependencies {
   compile project(path: ":beam-runners-local-java-core", configuration: 
"shadow")
   compile project(path: ":beam-runners-java-fn-execution", configuration: 
"shadow")
   compile project(path: ":beam-sdks-java-fn-execution", configuration: 
"shadow")
+  compileOnly project(path: ":beam-sdks-java-harness")
 
 Review comment:
   That's what this is, as far as I can tell - 
https://blog.gradle.org/introducing-compile-only-dependencies


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 104886)
Time Spent: 13h  (was: 12h 50m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 13h
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-22 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104871=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104871
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 22/May/18 23:52
Start Date: 22/May/18 23:52
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5349: 
[BEAM-3326] Remote stage evaluator
URL: https://github.com/apache/beam/pull/5349#discussion_r190083099
 
 

 ##
 File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/InProcessEnvironmentFactory.java
 ##
 @@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.beam.fn.harness.FnHarness;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.control.ControlClientPool;
+import 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import org.apache.beam.sdk.fn.stream.StreamObserverFactory;
+import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
+/**
+ * An {@link EnvironmentFactory} that communicates to a {@link FnHarness} via 
the in-process gRPC
+ * channel.
+ *
+ * TODO: Move this class to the runners/java-fn-execution module, with the 
Java SDK Harness as a
+ * provided dependency.
+ */
+class InProcessEnvironmentFactory implements EnvironmentFactory {
+
+  private final GrpcFnServer loggingServer;
+  private final GrpcFnServer controlServer;
+
+  private final ControlClientPool.Source clientSource;
+
+  InProcessEnvironmentFactory(
+  GrpcFnServer loggingServer,
+  GrpcFnServer controlServer,
+  ControlClientPool.Source clientSource) {
+this.loggingServer = loggingServer;
+this.controlServer = controlServer;
+checkArgument(
+loggingServer.getApiServiceDescriptor() != null,
+"Logging Server cannot have a null %s",
+ApiServiceDescriptor.class.getSimpleName());
+checkArgument(
+controlServer.getApiServiceDescriptor() != null,
+"Control Server cannot have a null %s",
+ApiServiceDescriptor.class.getSimpleName());
+this.clientSource = clientSource;
+  }
+
+  @Override
+  public RemoteEnvironment createEnvironment(Environment container) throws 
Exception {
+ExecutorService executor = Executors.newSingleThreadExecutor();
+Future fnHarness =
+executor.submit(
+() ->
+FnHarness.main(
 
 Review comment:
   The best scenario would be if we created a new classpath which contained the 
Java harness code + what we detect in the users classpath and ran that. This 
would have the benefit of simulating the SDK container and remove the need for 
users to remember to put provide beam-sdks-java-harness as a dependency. The 
idea would be that beam-sdks-java-harness would be a jar containing all of its 
dependencies shaded away similar to what we do for Dataflow. We would embed the 
beam-sdks-java-harness.jar as a jar inside the direct runner jar so we wouldn't 
need to fetch it from Maven and also would be able to solve the problem where 
tests would run from the provided jar.
   
   If this seems to much, a corresponding JIRA and updating this code so if the 
FnHarness.main would throw a `ClassNotFoundException`, we tell users that the 
shared process 

[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-22 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104870=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104870
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 22/May/18 23:52
Start Date: 22/May/18 23:52
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5349: 
[BEAM-3326] Remote stage evaluator
URL: https://github.com/apache/beam/pull/5349#discussion_r190086882
 
 

 ##
 File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/InProcessEnvironmentFactory.java
 ##
 @@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.beam.fn.harness.FnHarness;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.control.ControlClientPool;
+import 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import org.apache.beam.sdk.fn.stream.StreamObserverFactory;
+import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
+/**
+ * An {@link EnvironmentFactory} that communicates to a {@link FnHarness} via 
the in-process gRPC
+ * channel.
+ *
+ * TODO: Move this class to the runners/java-fn-execution module, with the 
Java SDK Harness as a
+ * provided dependency.
+ */
+class InProcessEnvironmentFactory implements EnvironmentFactory {
+
+  private final GrpcFnServer loggingServer;
+  private final GrpcFnServer controlServer;
+
+  private final ControlClientPool.Source clientSource;
+
+  InProcessEnvironmentFactory(
 
 Review comment:
   Would it make sense to migrate the InProcessSdkHarness TestRule to use the 
InProcessEnvironmentFactory?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 104870)
Time Spent: 12h  (was: 11h 50m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 12h
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-22 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104875=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104875
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 22/May/18 23:52
Start Date: 22/May/18 23:52
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5349: 
[BEAM-3326] Remote stage evaluator
URL: https://github.com/apache/beam/pull/5349#discussion_r190082848
 
 

 ##
 File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/InProcessEnvironmentFactory.java
 ##
 @@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.beam.fn.harness.FnHarness;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.control.ControlClientPool;
+import 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import org.apache.beam.sdk.fn.stream.StreamObserverFactory;
+import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
+/**
+ * An {@link EnvironmentFactory} that communicates to a {@link FnHarness} via 
the in-process gRPC
+ * channel.
+ *
+ * TODO: Move this class to the runners/java-fn-execution module, with the 
Java SDK Harness as a
+ * provided dependency.
+ */
+class InProcessEnvironmentFactory implements EnvironmentFactory {
+
+  private final GrpcFnServer loggingServer;
+  private final GrpcFnServer controlServer;
+
+  private final ControlClientPool.Source clientSource;
+
+  InProcessEnvironmentFactory(
+  GrpcFnServer loggingServer,
+  GrpcFnServer controlServer,
+  ControlClientPool.Source clientSource) {
+this.loggingServer = loggingServer;
+this.controlServer = controlServer;
+checkArgument(
+loggingServer.getApiServiceDescriptor() != null,
+"Logging Server cannot have a null %s",
+ApiServiceDescriptor.class.getSimpleName());
+checkArgument(
+controlServer.getApiServiceDescriptor() != null,
+"Control Server cannot have a null %s",
+ApiServiceDescriptor.class.getSimpleName());
+this.clientSource = clientSource;
+  }
+
+  @Override
+  public RemoteEnvironment createEnvironment(Environment container) throws 
Exception {
+ExecutorService executor = Executors.newSingleThreadExecutor();
+Future fnHarness =
+executor.submit(
+() ->
+FnHarness.main(
+PipelineOptionsFactory.create(),
 
 Review comment:
   Shouldn't we propagate `PipelineOptions` through?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 104875)
Time Spent: 12h 50m  (was: 12h 40m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  

[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-22 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104874=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104874
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 22/May/18 23:52
Start Date: 22/May/18 23:52
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5349: 
[BEAM-3326] Remote stage evaluator
URL: https://github.com/apache/beam/pull/5349#discussion_r190087507
 
 

 ##
 File path: 
runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java
 ##
 @@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import static com.google.common.collect.Iterables.getOnlyElement;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.Iterables;
+import java.io.Serializable;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.InProcessServerFactory;
+import 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
+import org.apache.beam.runners.fnexecution.data.GrpcDataService;
+import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter;
+import org.apache.beam.runners.fnexecution.state.GrpcStateService;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.joda.time.Instant;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link RemoteStageEvaluatorFactory}. */
+@RunWith(JUnit4.class)
+public class RemoteStageEvaluatorFactoryTest implements Serializable {
+
+  private transient RemoteStageEvaluatorFactory factory;
+  private transient ExecutorService executor;
+  private transient GrpcFnServer dataServer;
 
 Review comment:
   Use the InProcessSdkHarness TestRule 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 104874)
Time Spent: 12h 40m  (was: 12.5h)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  

[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-22 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104873=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104873
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 22/May/18 23:52
Start Date: 22/May/18 23:52
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5349: 
[BEAM-3326] Remote stage evaluator
URL: https://github.com/apache/beam/pull/5349#discussion_r190086276
 
 

 ##
 File path: runners/direct-java/build.gradle
 ##
 @@ -58,12 +58,14 @@ dependencies {
   compile project(path: ":beam-runners-local-java-core", configuration: 
"shadow")
   compile project(path: ":beam-runners-java-fn-execution", configuration: 
"shadow")
   compile project(path: ":beam-sdks-java-fn-execution", configuration: 
"shadow")
+  compileOnly project(path: ":beam-sdks-java-harness")
   shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
   shadow library.java.joda_time
   shadow library.java.findbugs_jsr305
   shadow library.java.slf4j_api
   shadow library.java.hamcrest_core
   shadow library.java.junit
+  testCompile project(path: ":beam-sdks-java-harness")
 
 Review comment:
   using `provided` scope above should allow you to remove this line here


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 104873)
Time Spent: 12.5h  (was: 12h 20m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 12.5h
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-22 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104868=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104868
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 22/May/18 23:52
Start Date: 22/May/18 23:52
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5349: 
[BEAM-3326] Remote stage evaluator
URL: https://github.com/apache/beam/pull/5349#discussion_r190082050
 
 

 ##
 File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/InProcessEnvironmentFactory.java
 ##
 @@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.beam.fn.harness.FnHarness;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.control.ControlClientPool;
+import 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import org.apache.beam.sdk.fn.stream.StreamObserverFactory;
+import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
+/**
+ * An {@link EnvironmentFactory} that communicates to a {@link FnHarness} via 
the in-process gRPC
+ * channel.
+ *
+ * TODO: Move this class to the runners/java-fn-execution module, with the 
Java SDK Harness as a
 
 Review comment:
   Is it excessively difficult to do this now?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 104868)
Time Spent: 11h 40m  (was: 11.5h)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 11h 40m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-22 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104867=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104867
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 22/May/18 23:52
Start Date: 22/May/18 23:52
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5349: 
[BEAM-3326] Remote stage evaluator
URL: https://github.com/apache/beam/pull/5349#discussion_r190082730
 
 

 ##
 File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/InProcessEnvironmentFactory.java
 ##
 @@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.beam.fn.harness.FnHarness;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.control.ControlClientPool;
+import 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import org.apache.beam.sdk.fn.stream.StreamObserverFactory;
+import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
+/**
+ * An {@link EnvironmentFactory} that communicates to a {@link FnHarness} via 
the in-process gRPC
 
 Review comment:
   The important point isn't that the in-process gRPC channel is used, it is 
that the runner, SDK, and users code and dependencies are all shared inside the 
same process.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 104867)
Time Spent: 11.5h  (was: 11h 20m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 11.5h
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-22 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104869=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104869
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 22/May/18 23:52
Start Date: 22/May/18 23:52
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5349: 
[BEAM-3326] Remote stage evaluator
URL: https://github.com/apache/beam/pull/5349#discussion_r190085434
 
 

 ##
 File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/InProcessEnvironmentFactory.java
 ##
 @@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.beam.fn.harness.FnHarness;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.control.ControlClientPool;
+import 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import org.apache.beam.sdk.fn.stream.StreamObserverFactory;
+import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
+/**
+ * An {@link EnvironmentFactory} that communicates to a {@link FnHarness} via 
the in-process gRPC
+ * channel.
+ *
+ * TODO: Move this class to the runners/java-fn-execution module, with the 
Java SDK Harness as a
+ * provided dependency.
+ */
+class InProcessEnvironmentFactory implements EnvironmentFactory {
+
+  private final GrpcFnServer loggingServer;
+  private final GrpcFnServer controlServer;
+
+  private final ControlClientPool.Source clientSource;
+
+  InProcessEnvironmentFactory(
+  GrpcFnServer loggingServer,
+  GrpcFnServer controlServer,
+  ControlClientPool.Source clientSource) {
+this.loggingServer = loggingServer;
+this.controlServer = controlServer;
+checkArgument(
+loggingServer.getApiServiceDescriptor() != null,
+"Logging Server cannot have a null %s",
+ApiServiceDescriptor.class.getSimpleName());
+checkArgument(
+controlServer.getApiServiceDescriptor() != null,
+"Control Server cannot have a null %s",
+ApiServiceDescriptor.class.getSimpleName());
+this.clientSource = clientSource;
+  }
+
+  @Override
+  public RemoteEnvironment createEnvironment(Environment container) throws 
Exception {
+ExecutorService executor = Executors.newSingleThreadExecutor();
+Future fnHarness =
+executor.submit(
+() ->
+FnHarness.main(
+PipelineOptionsFactory.create(),
+loggingServer.getApiServiceDescriptor(),
+controlServer.getApiServiceDescriptor(),
+InProcessManagedChannelFactory.create(),
+StreamObserverFactory.direct()));
+executor.submit(
+() -> {
+  try {
+fnHarness.get();
+  } catch (Throwable t) {
+executor.shutdownNow();
+  }
+});
+
+// TODO: find some way to populate the actual ID in FnHarness.main()
 
 Review comment:
   Instead of relying on the WORKER_ID environment variable in Python, we could 
make the client id as part of the ApiServiceDescriptor. This would make the 
client id chosen by whomever vends the client and removes looking up a property.
   
   @herohde @angoenka What do you think?


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-22 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104872=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104872
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 22/May/18 23:52
Start Date: 22/May/18 23:52
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5349: 
[BEAM-3326] Remote stage evaluator
URL: https://github.com/apache/beam/pull/5349#discussion_r190086221
 
 

 ##
 File path: runners/direct-java/build.gradle
 ##
 @@ -58,12 +58,14 @@ dependencies {
   compile project(path: ":beam-runners-local-java-core", configuration: 
"shadow")
   compile project(path: ":beam-runners-java-fn-execution", configuration: 
"shadow")
   compile project(path: ":beam-sdks-java-fn-execution", configuration: 
"shadow")
+  compileOnly project(path: ":beam-sdks-java-harness")
 
 Review comment:
   use the `provided` scope.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 104872)
Time Spent: 12h 20m  (was: 12h 10m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 12h 20m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-22 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104679=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104679
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 22/May/18 16:49
Start Date: 22/May/18 16:49
Worklog Time Spent: 10m 
  Work Description: tgroh closed pull request #5348: [BEAM-3326] Add a 
Direct Job Bundle Factory
URL: https://github.com/apache/beam/pull/5348
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runners/direct-java/build.gradle b/runners/direct-java/build.gradle
index 220f4fcffb9..e74cd5313fe 100644
--- a/runners/direct-java/build.gradle
+++ b/runners/direct-java/build.gradle
@@ -57,6 +57,7 @@ dependencies {
   compile project(path: ":beam-runners-core-java", configuration: "shadow")
   compile project(path: ":beam-runners-local-java-core", configuration: 
"shadow")
   compile project(path: ":beam-runners-java-fn-execution", configuration: 
"shadow")
+  compile project(path: ":beam-sdks-java-fn-execution", configuration: 
"shadow")
   shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
   shadow library.java.joda_time
   shadow library.java.findbugs_jsr305
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index 0613a0a863f..1d3d790a747 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -221,6 +221,11 @@
   beam-runners-core-java
 
 
+
+  org.apache.beam
+  beam-sdks-java-fn-execution
+
+
 
   org.apache.beam
   beam-runners-java-fn-execution
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/BundleFactoryOutputRecieverFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/BundleFactoryOutputRecieverFactory.java
new file mode 100644
index 000..3285619fd2d
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/BundleFactoryOutputRecieverFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import java.util.function.Consumer;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * An {@link OutputReceiverFactory} which adds received elements to {@link 
UncommittedBundle}
+ * instances. The produced {@link UncommittedBundle bundles} are added to a 
provided {@link
+ * StepTransformResult.Builder StepTransformResult Builder}.
+ */
+class BundleFactoryOutputRecieverFactory implements OutputReceiverFactory {
+  private final BundleFactory bundleFactory;
+  private final RunnerApi.Components components;
+
+  private final Consumer bundleConsumer;
+
+  private BundleFactoryOutputRecieverFactory(
+  BundleFactory bundleFactory,
+  Components components,
+  Consumer bundleConsumer) {
+this.bundleFactory = bundleFactory;
+this.components = components;
+this.bundleConsumer = bundleConsumer;
+  }
+
+  public static OutputReceiverFactory create(
+  BundleFactory bundleFactory,
+  Components components,
+  Consumer resultBuilder) {
+return new BundleFactoryOutputRecieverFactory(bundleFactory, components, 
resultBuilder);
+  }
+
+  @Override
+  public  FnDataReceiver create(String pCollectionId) {
+PCollectionNode pcollection =
+PipelineNode.pCollection(pCollectionId, 
components.getPcollectionsOrThrow(pCollectionId));
+return 

[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-22 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104678=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104678
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 22/May/18 16:49
Start Date: 22/May/18 16:49
Worklog Time Spent: 10m 
  Work Description: tgroh commented on issue #5349: [BEAM-3326] Remote 
stage evaluator
URL: https://github.com/apache/beam/pull/5349#issuecomment-391061972
 
 
   run java precommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 104678)
Time Spent: 11h 10m  (was: 11h)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 11h 10m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-21 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104105=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104105
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 21/May/18 16:18
Start Date: 21/May/18 16:18
Worklog Time Spent: 10m 
  Work Description: tgroh commented on issue #5348: [BEAM-3326] Add a 
Direct Job Bundle Factory
URL: https://github.com/apache/beam/pull/5348#issuecomment-390704306
 
 
   run java precommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 104105)
Time Spent: 11h  (was: 10h 50m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 11h
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=103588=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103588
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 18/May/18 23:07
Start Date: 18/May/18 23:07
Worklog Time Spent: 10m 
  Work Description: tgroh commented on issue #5348: [BEAM-3326] Add a 
Direct Job Bundle Factory
URL: https://github.com/apache/beam/pull/5348#issuecomment-390354745
 
 
   Done to all


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 103588)
Time Spent: 10h 50m  (was: 10h 40m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 10h 50m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=103585=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103585
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 18/May/18 22:56
Start Date: 18/May/18 22:56
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5348: 
[BEAM-3326] Add a Direct Job Bundle Factory
URL: https://github.com/apache/beam/pull/5348#discussion_r189410028
 
 

 ##
 File path: 
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/IdGenerator.java
 ##
 @@ -0,0 +1,30 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
 
 Review comment:
   Messed up license header.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 103585)
Time Spent: 10h 40m  (was: 10.5h)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 10h 40m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=103580=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103580
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 18/May/18 22:56
Start Date: 18/May/18 22:56
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5348: 
[BEAM-3326] Add a Direct Job Bundle Factory
URL: https://github.com/apache/beam/pull/5348#discussion_r189410092
 
 

 ##
 File path: 
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/package-info.java
 ##
 @@ -0,0 +1,23 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
 
 Review comment:
   license header


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 103580)
Time Spent: 9h 50m  (was: 9h 40m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 9h 50m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=103582=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103582
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 18/May/18 22:56
Start Date: 18/May/18 22:56
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5348: 
[BEAM-3326] Add a Direct Job Bundle Factory
URL: https://github.com/apache/beam/pull/5348#discussion_r189410106
 
 

 ##
 File path: 
sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/IdGeneratorsTest.java
 ##
 @@ -0,0 +1,65 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
 
 Review comment:
   license header.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 103582)
Time Spent: 10h 10m  (was: 10h)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 10h 10m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=103584=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103584
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 18/May/18 22:56
Start Date: 18/May/18 22:56
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5348: 
[BEAM-3326] Add a Direct Job Bundle Factory
URL: https://github.com/apache/beam/pull/5348#discussion_r189409699
 
 

 ##
 File path: 
runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/DirectJobBundleFactoryTest.java
 ##
 @@ -0,0 +1,181 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
 
 Review comment:
   License header incorrectly formatted.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 103584)
Time Spent: 10.5h  (was: 10h 20m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 10.5h
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=103583=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103583
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 18/May/18 22:56
Start Date: 18/May/18 22:56
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5348: 
[BEAM-3326] Add a Direct Job Bundle Factory
URL: https://github.com/apache/beam/pull/5348#discussion_r189409672
 
 

 ##
 File path: 
runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/BundleFactoryOutputRecieverFactoryTest.java
 ##
 @@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import static com.google.common.collect.Iterables.getOnlyElement;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.MessageWithComponents;
+import org.apache.beam.runners.core.construction.CoderTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.SdkComponents;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link BundleFactoryOutputRecieverFactory}. */
+@RunWith(JUnit4.class)
+public class BundleFactoryOutputRecieverFactoryTest {
+  private final BundleFactory bundleFactory = 
ImmutableListBundleFactory.create();
+  private PCollectionNode fooPC;
+  private PCollectionNode barPC;
+  private RunnerApi.Components components;
+
+  private OutputReceiverFactory factory;
+  private Collection outputBundles;
+
+  @Before
+  public void setup() throws IOException {
+Pipeline p = Pipeline.create();
+PCollection foo =
+p.apply("createFoo", Create.of("1", "2", "3"))
+.apply("windowFoo", 
Window.into(FixedWindows.of(Duration.standardMinutes(5L;
+PCollection bar = p.apply("bar", Create.of(1, 2, 3));
+
+SdkComponents sdkComponents = SdkComponents.create();
+String fooId = sdkComponents.registerPCollection(foo);
+String barId = sdkComponents.registerPCollection(bar);
+components = sdkComponents.toComponents();
+
+fooPC = PipelineNode.pCollection(fooId, 
components.getPcollectionsOrThrow(fooId));
+barPC = PipelineNode.pCollection(barId, 
components.getPcollectionsOrThrow(barId));
+
+outputBundles = new ArrayList<>();
+factory =
+BundleFactoryOutputRecieverFactory.create(bundleFactory, components, 
outputBundles::add);
+  }
+
+  @Test
+  public void addsBundlesToResult() {
+factory.create(fooPC.getId());
+factory.create(barPC.getId());
+
+assertThat(Iterables.size(outputBundles), equalTo(2));
+
+Collection pcollections = new ArrayList<>();
+for (UncommittedBundle bundle : outputBundles) {
+  

[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=103581=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103581
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 18/May/18 22:56
Start Date: 18/May/18 22:56
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5348: 
[BEAM-3326] Add a Direct Job Bundle Factory
URL: https://github.com/apache/beam/pull/5348#discussion_r189408444
 
 

 ##
 File path: runners/direct-java/pom.xml
 ##
 @@ -204,14 +204,19 @@
   beam-runners-core-java
 
 
+
+  org.apache.beam
+  beam-sdks-java-fn-execution
+
+
 
   org.apache.beam
   beam-runners-java-fn-execution
 
 
 
   com.google.guava
-  guava
+  guavatn
 
 Review comment:
   `tn` will break the Maven build


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 103581)
Time Spent: 10h  (was: 9h 50m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 10h
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-16 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=102547=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-102547
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 16/May/18 16:58
Start Date: 16/May/18 16:58
Worklog Time Spent: 10m 
  Work Description: tgroh commented on issue #5348: [BEAM-3326] Add a 
Direct Job Bundle Factory
URL: https://github.com/apache/beam/pull/5348#issuecomment-389591672
 
 
   @lukecwik PTAL


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 102547)
Time Spent: 9h 40m  (was: 9.5h)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 9h 40m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-14 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=101946=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101946
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 15/May/18 00:34
Start Date: 15/May/18 00:34
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #5348: 
[BEAM-3326] Add a Direct Job Bundle Factory
URL: https://github.com/apache/beam/pull/5348#discussion_r188137062
 
 

 ##
 File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectJobBundleFactory.java
 ##
 @@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
+import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
+import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
+import 
org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.ExecutableProcessBundleDescriptor;
+import org.apache.beam.runners.fnexecution.control.RemoteBundle;
+import org.apache.beam.runners.fnexecution.control.RemoteOutputReceiver;
+import org.apache.beam.runners.fnexecution.control.SdkHarnessClient;
+import 
org.apache.beam.runners.fnexecution.control.SdkHarnessClient.ActiveBundle;
+import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
+import org.apache.beam.runners.fnexecution.data.GrpcDataService;
+import org.apache.beam.runners.fnexecution.data.RemoteInputDestination;
+import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
+import org.apache.beam.runners.fnexecution.state.GrpcStateService;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.util.MoreFutures;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/** A {@link JobBundleFactory} for the ReferenceRunner. */
+class DirectJobBundleFactory implements JobBundleFactory {
 
 Review comment:
   Tested exceptions in the `close` methods.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 101946)
Time Spent: 9h 20m  (was: 9h 10m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 9h 20m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-14 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=101947=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101947
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 15/May/18 00:34
Start Date: 15/May/18 00:34
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #5348: 
[BEAM-3326] Add a Direct Job Bundle Factory
URL: https://github.com/apache/beam/pull/5348#discussion_r188137160
 
 

 ##
 File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectJobBundleFactory.java
 ##
 @@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
+import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
+import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
+import 
org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.ExecutableProcessBundleDescriptor;
+import org.apache.beam.runners.fnexecution.control.RemoteBundle;
+import org.apache.beam.runners.fnexecution.control.RemoteOutputReceiver;
+import org.apache.beam.runners.fnexecution.control.SdkHarnessClient;
+import 
org.apache.beam.runners.fnexecution.control.SdkHarnessClient.ActiveBundle;
+import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
+import org.apache.beam.runners.fnexecution.data.GrpcDataService;
+import org.apache.beam.runners.fnexecution.data.RemoteInputDestination;
+import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
+import org.apache.beam.runners.fnexecution.state.GrpcStateService;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.util.MoreFutures;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/** A {@link JobBundleFactory} for the ReferenceRunner. */
+class DirectJobBundleFactory implements JobBundleFactory {
+  private final EnvironmentFactory environmentFactory;
+
+  private final GrpcFnServer dataService;
+  private final GrpcFnServer stateService;
+
+  private final ConcurrentMap 
stageBundleFactories =
+  new ConcurrentHashMap<>();
+  private final ConcurrentMap environments =
+  new ConcurrentHashMap<>();
+
+  DirectJobBundleFactory(
+  EnvironmentFactory environmentFactory,
+  GrpcFnServer dataService,
+  GrpcFnServer stateService) {
+this.environmentFactory = environmentFactory;
+this.dataService = dataService;
+this.stateService = stateService;
+  }
+
+  @Override
+  public  StageBundleFactory forStage(ExecutableStage executableStage) {
+return (StageBundleFactory)
+stageBundleFactories.computeIfAbsent(executableStage, 
this::createBundleFactory);
+  }
+
+  private final AtomicLong idgen = new AtomicLong();
+
+  private  StageBundleFactory createBundleFactory(ExecutableStage stage) 
{
+RemoteEnvironment remoteEnv =
+environments.computeIfAbsent(
+stage.getEnvironment(),
+env -> {
+  try {
+return environmentFactory.createEnvironment(env);
+  } catch (Exception e) {
+throw new RuntimeException(e);
+  }
+});
+SdkHarnessClient sdkHarnessClient =
+

[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-14 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=101925=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101925
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 14/May/18 23:22
Start Date: 14/May/18 23:22
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #5348: 
[BEAM-3326] Add a Direct Job Bundle Factory
URL: https://github.com/apache/beam/pull/5348#discussion_r188127187
 
 

 ##
 File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectJobBundleFactory.java
 ##
 @@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
+import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
+import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
+import 
org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.ExecutableProcessBundleDescriptor;
+import org.apache.beam.runners.fnexecution.control.RemoteBundle;
+import org.apache.beam.runners.fnexecution.control.RemoteOutputReceiver;
+import org.apache.beam.runners.fnexecution.control.SdkHarnessClient;
+import 
org.apache.beam.runners.fnexecution.control.SdkHarnessClient.ActiveBundle;
+import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
+import org.apache.beam.runners.fnexecution.data.GrpcDataService;
+import org.apache.beam.runners.fnexecution.data.RemoteInputDestination;
+import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
+import org.apache.beam.runners.fnexecution.state.GrpcStateService;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.util.MoreFutures;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/** A {@link JobBundleFactory} for the ReferenceRunner. */
+class DirectJobBundleFactory implements JobBundleFactory {
+  private final EnvironmentFactory environmentFactory;
+
+  private final GrpcFnServer dataService;
+  private final GrpcFnServer stateService;
+
+  private final ConcurrentMap 
stageBundleFactories =
+  new ConcurrentHashMap<>();
+  private final ConcurrentMap environments =
+  new ConcurrentHashMap<>();
+
+  DirectJobBundleFactory(
+  EnvironmentFactory environmentFactory,
+  GrpcFnServer dataService,
+  GrpcFnServer stateService) {
+this.environmentFactory = environmentFactory;
+this.dataService = dataService;
+this.stateService = stateService;
+  }
+
+  @Override
+  public  StageBundleFactory forStage(ExecutableStage executableStage) {
+return (StageBundleFactory)
+stageBundleFactories.computeIfAbsent(executableStage, 
this::createBundleFactory);
+  }
+
+  private final AtomicLong idgen = new AtomicLong();
 
 Review comment:
   Done.
   
   Done everywhere, specifically.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 101925)
Time Spent: 

[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-14 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=101813=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101813
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 14/May/18 18:17
Start Date: 14/May/18 18:17
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5348: 
[BEAM-3326] Add a Direct Job Bundle Factory
URL: https://github.com/apache/beam/pull/5348#discussion_r188044479
 
 

 ##
 File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectJobBundleFactory.java
 ##
 @@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
+import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
+import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
+import 
org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.ExecutableProcessBundleDescriptor;
+import org.apache.beam.runners.fnexecution.control.RemoteBundle;
+import org.apache.beam.runners.fnexecution.control.RemoteOutputReceiver;
+import org.apache.beam.runners.fnexecution.control.SdkHarnessClient;
+import 
org.apache.beam.runners.fnexecution.control.SdkHarnessClient.ActiveBundle;
+import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
+import org.apache.beam.runners.fnexecution.data.GrpcDataService;
+import org.apache.beam.runners.fnexecution.data.RemoteInputDestination;
+import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
+import org.apache.beam.runners.fnexecution.state.GrpcStateService;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.util.MoreFutures;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/** A {@link JobBundleFactory} for the ReferenceRunner. */
+class DirectJobBundleFactory implements JobBundleFactory {
+  private final EnvironmentFactory environmentFactory;
+
+  private final GrpcFnServer dataService;
+  private final GrpcFnServer stateService;
+
+  private final ConcurrentMap 
stageBundleFactories =
+  new ConcurrentHashMap<>();
+  private final ConcurrentMap environments =
+  new ConcurrentHashMap<>();
+
+  DirectJobBundleFactory(
+  EnvironmentFactory environmentFactory,
+  GrpcFnServer dataService,
+  GrpcFnServer stateService) {
+this.environmentFactory = environmentFactory;
+this.dataService = dataService;
+this.stateService = stateService;
+  }
+
+  @Override
+  public  StageBundleFactory forStage(ExecutableStage executableStage) {
+return (StageBundleFactory)
+stageBundleFactories.computeIfAbsent(executableStage, 
this::createBundleFactory);
+  }
+
+  private final AtomicLong idgen = new AtomicLong();
 
 Review comment:
   Use 
https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/IdGenerator.java
   
   Move it to a shared location if needed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact 

[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-14 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=101810=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101810
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 14/May/18 18:16
Start Date: 14/May/18 18:16
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5348: 
[BEAM-3326] Add a Direct Job Bundle Factory
URL: https://github.com/apache/beam/pull/5348#discussion_r188048988
 
 

 ##
 File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectJobBundleFactory.java
 ##
 @@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
+import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
+import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
+import 
org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.ExecutableProcessBundleDescriptor;
+import org.apache.beam.runners.fnexecution.control.RemoteBundle;
+import org.apache.beam.runners.fnexecution.control.RemoteOutputReceiver;
+import org.apache.beam.runners.fnexecution.control.SdkHarnessClient;
+import 
org.apache.beam.runners.fnexecution.control.SdkHarnessClient.ActiveBundle;
+import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
+import org.apache.beam.runners.fnexecution.data.GrpcDataService;
+import org.apache.beam.runners.fnexecution.data.RemoteInputDestination;
+import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
+import org.apache.beam.runners.fnexecution.state.GrpcStateService;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.util.MoreFutures;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/** A {@link JobBundleFactory} for the ReferenceRunner. */
+class DirectJobBundleFactory implements JobBundleFactory {
 
 Review comment:
   Are you planning to have NeedsRunner/ValidatesRunner cover this and the 
StageBundleFactory testing?
   
   If so, it might be worthwhile to add error handling tests since 
NeedsRunner/ValidatesRunner typically don't handle edge cases.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 101810)
Time Spent: 8h 40m  (was: 8.5h)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 8h 40m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was 

[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-14 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=101811=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101811
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 14/May/18 18:16
Start Date: 14/May/18 18:16
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5348: 
[BEAM-3326] Add a Direct Job Bundle Factory
URL: https://github.com/apache/beam/pull/5348#discussion_r188046059
 
 

 ##
 File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectJobBundleFactory.java
 ##
 @@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
+import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
+import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
+import 
org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.ExecutableProcessBundleDescriptor;
+import org.apache.beam.runners.fnexecution.control.RemoteBundle;
+import org.apache.beam.runners.fnexecution.control.RemoteOutputReceiver;
+import org.apache.beam.runners.fnexecution.control.SdkHarnessClient;
+import 
org.apache.beam.runners.fnexecution.control.SdkHarnessClient.ActiveBundle;
+import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
+import org.apache.beam.runners.fnexecution.data.GrpcDataService;
+import org.apache.beam.runners.fnexecution.data.RemoteInputDestination;
+import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
+import org.apache.beam.runners.fnexecution.state.GrpcStateService;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.util.MoreFutures;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/** A {@link JobBundleFactory} for the ReferenceRunner. */
+class DirectJobBundleFactory implements JobBundleFactory {
+  private final EnvironmentFactory environmentFactory;
+
+  private final GrpcFnServer dataService;
+  private final GrpcFnServer stateService;
+
+  private final ConcurrentMap 
stageBundleFactories =
+  new ConcurrentHashMap<>();
+  private final ConcurrentMap environments =
+  new ConcurrentHashMap<>();
+
+  DirectJobBundleFactory(
+  EnvironmentFactory environmentFactory,
+  GrpcFnServer dataService,
+  GrpcFnServer stateService) {
+this.environmentFactory = environmentFactory;
+this.dataService = dataService;
+this.stateService = stateService;
+  }
+
+  @Override
+  public  StageBundleFactory forStage(ExecutableStage executableStage) {
+return (StageBundleFactory)
+stageBundleFactories.computeIfAbsent(executableStage, 
this::createBundleFactory);
+  }
+
+  private final AtomicLong idgen = new AtomicLong();
+
+  private  StageBundleFactory createBundleFactory(ExecutableStage stage) 
{
+RemoteEnvironment remoteEnv =
+environments.computeIfAbsent(
+stage.getEnvironment(),
+env -> {
+  try {
+return environmentFactory.createEnvironment(env);
+  } catch (Exception e) {
+throw new RuntimeException(e);
+  }
+});
+SdkHarnessClient sdkHarnessClient =
+

[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-14 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=101812=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101812
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 14/May/18 18:16
Start Date: 14/May/18 18:16
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5348: 
[BEAM-3326] Add a Direct Job Bundle Factory
URL: https://github.com/apache/beam/pull/5348#discussion_r188044479
 
 

 ##
 File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectJobBundleFactory.java
 ##
 @@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
+import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
+import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
+import 
org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.ExecutableProcessBundleDescriptor;
+import org.apache.beam.runners.fnexecution.control.RemoteBundle;
+import org.apache.beam.runners.fnexecution.control.RemoteOutputReceiver;
+import org.apache.beam.runners.fnexecution.control.SdkHarnessClient;
+import 
org.apache.beam.runners.fnexecution.control.SdkHarnessClient.ActiveBundle;
+import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
+import org.apache.beam.runners.fnexecution.data.GrpcDataService;
+import org.apache.beam.runners.fnexecution.data.RemoteInputDestination;
+import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
+import org.apache.beam.runners.fnexecution.state.GrpcStateService;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.util.MoreFutures;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/** A {@link JobBundleFactory} for the ReferenceRunner. */
+class DirectJobBundleFactory implements JobBundleFactory {
+  private final EnvironmentFactory environmentFactory;
+
+  private final GrpcFnServer dataService;
+  private final GrpcFnServer stateService;
+
+  private final ConcurrentMap 
stageBundleFactories =
+  new ConcurrentHashMap<>();
+  private final ConcurrentMap environments =
+  new ConcurrentHashMap<>();
+
+  DirectJobBundleFactory(
+  EnvironmentFactory environmentFactory,
+  GrpcFnServer dataService,
+  GrpcFnServer stateService) {
+this.environmentFactory = environmentFactory;
+this.dataService = dataService;
+this.stateService = stateService;
+  }
+
+  @Override
+  public  StageBundleFactory forStage(ExecutableStage executableStage) {
+return (StageBundleFactory)
+stageBundleFactories.computeIfAbsent(executableStage, 
this::createBundleFactory);
+  }
+
+  private final AtomicLong idgen = new AtomicLong();
 
 Review comment:
   Use 
https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/IdGenerator.java


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue 

[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-11 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=101369=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101369
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 12/May/18 01:18
Start Date: 12/May/18 01:18
Worklog Time Spent: 10m 
  Work Description: tgroh opened a new pull request #5349: [BEAM-3326] 
Remote stage evaluator
URL: https://github.com/apache/beam/pull/5349
 
 
   Add a Primitive Evaluator that evaluates remote stages.
   
   Add an InProcessEnvironmentFactory to construct Java Fn Harnesses.
   
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [ ] Make sure there is a [JIRA 
issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the 
change (usually before you start working on it).  Trivial changes like typos do 
not require a JIRA issue.  Your pull request should address just this issue, 
without pulling in other changes.
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue.
- [ ] Write a pull request description that is detailed enough to 
understand:
  - [ ] What the pull request does
  - [ ] Why it does it
  - [ ] How it does it
  - [ ] Why this approach
- [ ] Each commit in the pull request should have a meaningful subject line 
and body.
- [ ] Run `./gradlew build` to make sure basic checks pass. A more thorough 
check will be performed on your pull request automatically.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 101369)
Time Spent: 8.5h  (was: 8h 20m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 8.5h
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-11 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=101363=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101363
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 12/May/18 00:45
Start Date: 12/May/18 00:45
Worklog Time Spent: 10m 
  Work Description: tgroh opened a new pull request #5348: [BEAM-3326] Add 
a Direct Job Bundle Factory
URL: https://github.com/apache/beam/pull/5348
 
 
   This is the JobBundleFactory for the DirectRunner, and the 
OutputReceiverFactory for it to use.
   
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [ ] Make sure there is a [JIRA 
issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the 
change (usually before you start working on it).  Trivial changes like typos do 
not require a JIRA issue.  Your pull request should address just this issue, 
without pulling in other changes.
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue.
- [ ] Write a pull request description that is detailed enough to 
understand:
  - [ ] What the pull request does
  - [ ] Why it does it
  - [ ] How it does it
  - [ ] Why this approach
- [ ] Each commit in the pull request should have a meaningful subject line 
and body.
- [ ] Run `./gradlew build` to make sure basic checks pass. A more thorough 
check will be performed on your pull request automatically.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 101363)
Time Spent: 8h 10m  (was: 8h)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 8h 10m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-11 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=101364=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101364
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 12/May/18 00:45
Start Date: 12/May/18 00:45
Worklog Time Spent: 10m 
  Work Description: tgroh commented on issue #5348: [BEAM-3326] Add a 
Direct Job Bundle Factory
URL: https://github.com/apache/beam/pull/5348#issuecomment-388517950
 
 
   R: @axelmagn 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 101364)
Time Spent: 8h 20m  (was: 8h 10m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 8h 20m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-03 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=98097=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-98097
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 04/May/18 01:53
Start Date: 04/May/18 01:53
Worklog Time Spent: 10m 
  Work Description: tgroh closed pull request #5269: [BEAM-3326] Add an 
Unsupported StateRequestHandler
URL: https://github.com/apache/beam/pull/5269
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandler.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandler.java
index cfa86db1b04..d0858935d73 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandler.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandler.java
@@ -18,7 +18,8 @@
 package org.apache.beam.runners.fnexecution.state;
 
 import java.util.concurrent.CompletionStage;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
 
 /**
  * Handler for {@link 
org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest StateRequests}.
@@ -34,6 +35,12 @@
* Throwing an error during handling will complete the handler result 
{@link CompletionStage}
* exceptionally.
*/
-  CompletionStage 
handle(BeamFnApi.StateRequest request)
-  throws Exception;
+  CompletionStage handle(StateRequest request) throws 
Exception;
+
+  static StateRequestHandler unsupported() {
+return request -> {
+  throw new UnsupportedOperationException(
+  String.format("Cannot use an empty %s", 
StateRequestHandler.class.getSimpleName()));
+};
+  }
 }


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 98097)
Time Spent: 8h  (was: 7h 50m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 8h
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-03 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=98062=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-98062
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 03/May/18 22:30
Start Date: 03/May/18 22:30
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5269: 
[BEAM-3326] Add an Unsupported StateRequestHandler
URL: https://github.com/apache/beam/pull/5269#discussion_r185953920
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandler.java
 ##
 @@ -34,6 +35,12 @@
* Throwing an error during handling will complete the handler result 
{@link CompletionStage}
* exceptionally.
*/
-  CompletionStage 
handle(BeamFnApi.StateRequest request)
-  throws Exception;
+  CompletionStage handle(StateRequest request) throws 
Exception;
+
+  static StateRequestHandler unsupported() {
 
 Review comment:
   For some reason I thought it was a class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 98062)
Time Spent: 7h 50m  (was: 7h 40m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 7h 50m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-03 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=98030=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-98030
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 03/May/18 20:32
Start Date: 03/May/18 20:32
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #5269: 
[BEAM-3326] Add an Unsupported StateRequestHandler
URL: https://github.com/apache/beam/pull/5269#discussion_r185927830
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandler.java
 ##
 @@ -34,6 +35,12 @@
* Throwing an error during handling will complete the handler result 
{@link CompletionStage}
* exceptionally.
*/
-  CompletionStage 
handle(BeamFnApi.StateRequest request)
-  throws Exception;
+  CompletionStage handle(StateRequest request) throws 
Exception;
+
+  static StateRequestHandler unsupported() {
 
 Review comment:
   It is definitely not redundant - you must choose between `static`, 
`default`, or no-implementation for interface methods


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 98030)
Time Spent: 7h 40m  (was: 7.5h)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 7h 40m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-03 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=98027=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-98027
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 03/May/18 20:01
Start Date: 03/May/18 20:01
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5269: 
[BEAM-3326] Add an Unsupported StateRequestHandler
URL: https://github.com/apache/beam/pull/5269#discussion_r185919685
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandler.java
 ##
 @@ -34,6 +35,12 @@
* Throwing an error during handling will complete the handler result 
{@link CompletionStage}
* exceptionally.
*/
-  CompletionStage 
handle(BeamFnApi.StateRequest request)
-  throws Exception;
+  CompletionStage handle(StateRequest request) throws 
Exception;
+
+  static StateRequestHandler unsupported() {
 
 Review comment:
   `static` is also redundant.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 98027)
Time Spent: 7.5h  (was: 7h 20m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 7.5h
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-03 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=98011=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-98011
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 03/May/18 19:06
Start Date: 03/May/18 19:06
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5269: 
[BEAM-3326] Add an Unsupported StateRequestHandler
URL: https://github.com/apache/beam/pull/5269#discussion_r185906501
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandler.java
 ##
 @@ -34,6 +35,12 @@
* Throwing an error during handling will complete the handler result 
{@link CompletionStage}
* exceptionally.
*/
-  CompletionStage 
handle(BeamFnApi.StateRequest request)
-  throws Exception;
+  CompletionStage handle(StateRequest request) throws 
Exception;
+
+  static StateRequestHandler unsupported() {
 
 Review comment:
   Ah, I hadn't noticed this was an interface.
   
   I don't think `static` is redundant though.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 98011)
Time Spent: 7h 20m  (was: 7h 10m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 7h 20m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-03 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=98006=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-98006
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 03/May/18 18:55
Start Date: 03/May/18 18:55
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #5269: 
[BEAM-3326] Add an Unsupported StateRequestHandler
URL: https://github.com/apache/beam/pull/5269#discussion_r185903479
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandler.java
 ##
 @@ -34,6 +35,12 @@
* Throwing an error during handling will complete the handler result 
{@link CompletionStage}
* exceptionally.
*/
-  CompletionStage 
handle(BeamFnApi.StateRequest request)
-  throws Exception;
+  CompletionStage handle(StateRequest request) throws 
Exception;
+
+  static StateRequestHandler unsupported() {
 
 Review comment:
   classes inside interfaces are public and not package private.
   
   Drop `static` as it is also a redundant modifier.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 98006)
Time Spent: 7h 10m  (was: 7h)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 7h 10m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-03 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=98004=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-98004
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 03/May/18 18:35
Start Date: 03/May/18 18:35
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5269: 
[BEAM-3326] Add an Unsupported StateRequestHandler
URL: https://github.com/apache/beam/pull/5269#discussion_r185898147
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandler.java
 ##
 @@ -34,6 +35,12 @@
* Throwing an error during handling will complete the handler result 
{@link CompletionStage}
* exceptionally.
*/
-  CompletionStage 
handle(BeamFnApi.StateRequest request)
-  throws Exception;
+  CompletionStage handle(StateRequest request) throws 
Exception;
+
+  static StateRequestHandler unsupported() {
 
 Review comment:
   Why package-private? This seems useful anywhere we don't/can't support state.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 98004)
Time Spent: 7h  (was: 6h 50m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 7h
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-03 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=97980=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97980
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 03/May/18 17:42
Start Date: 03/May/18 17:42
Worklog Time Spent: 10m 
  Work Description: tgroh commented on issue #5269: [BEAM-3326] Add an 
Unsupported StateRequestHandler
URL: https://github.com/apache/beam/pull/5269#issuecomment-386377992
 
 
   R: @bsidhom 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 97980)
Time Spent: 6h 50m  (was: 6h 40m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 6h 50m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-05-03 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=97979=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97979
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 03/May/18 17:42
Start Date: 03/May/18 17:42
Worklog Time Spent: 10m 
  Work Description: tgroh opened a new pull request #5269: [BEAM-3326] Add 
an Unsupported StateRequestHandler
URL: https://github.com/apache/beam/pull/5269
 
 
   This can be provided to any consumer of state that consumes a
   StateRequestHandler if the PipelineRunner or Pipeline does not support
   State Requests in that context.
   
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [ ] Make sure there is a [JIRA 
issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the 
change (usually before you start working on it).  Trivial changes like typos do 
not require a JIRA issue.  Your pull request should address just this issue, 
without pulling in other changes.
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue.
- [ ] Write a pull request description that is detailed enough to 
understand:
  - [ ] What the pull request does
  - [ ] Why it does it
  - [ ] How it does it
  - [ ] Why this approach
- [ ] Each commit in the pull request should have a meaningful subject line 
and body.
- [ ] Run `./gradlew build` to make sure basic checks pass. A more thorough 
check will be performed on your pull request automatically.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 97979)
Time Spent: 6h 40m  (was: 6.5h)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 6h 40m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-04-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=92772=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92772
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 19/Apr/18 18:29
Start Date: 19/Apr/18 18:29
Worklog Time Spent: 10m 
  Work Description: tgroh closed pull request #4761: [BEAM-3326] Remove 
SdkHarnessClientControlService
URL: https://github.com/apache/beam/pull/4761
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientControlService.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientControlService.java
deleted file mode 100644
index 3f1d14809c9..000
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientControlService.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.fnexecution.control;
-
-import io.grpc.ServerServiceDefinition;
-import java.util.Collection;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.function.Supplier;
-import org.apache.beam.runners.fnexecution.FnService;
-import org.apache.beam.runners.fnexecution.HeaderAccessor;
-import org.apache.beam.runners.fnexecution.data.FnDataService;
-
-/**
- * A service providing {@link SdkHarnessClient} based on an internally managed 
{@link
- * FnApiControlClientPoolService}.
- */
-public class SdkHarnessClientControlService implements FnService {
-  private final FnApiControlClientPoolService clientPoolService;
-  private final ControlClientPool pendingClients;
-
-  private final Supplier dataService;
-
-  private final Collection activeClients;
-
-  public static SdkHarnessClientControlService create(
-  Supplier dataService, HeaderAccessor headerAccessor) {
-return new SdkHarnessClientControlService(dataService, headerAccessor);
-  }
-
-  private SdkHarnessClientControlService(
-  Supplier dataService, HeaderAccessor headerAccessor) {
-this.dataService = dataService;
-activeClients = new ConcurrentLinkedQueue<>();
-pendingClients = QueueControlClientPool.createSynchronous();
-clientPoolService =
-
FnApiControlClientPoolService.offeringClientsToPool(pendingClients.getSink(),
-headerAccessor);
-  }
-
-  public SdkHarnessClient getClient() {
-try {
-  // Block until a client is available.
-  FnApiControlClient getClient = pendingClients.getSource().get();
-  return SdkHarnessClient.usingFnApiClient(getClient, dataService.get());
-} catch (InterruptedException e) {
-  Thread.currentThread().interrupt();
-  throw new RuntimeException("Interrupted while waiting for client", e);
-} catch (Exception e) {
-  throw new RuntimeException(e);
-}
-  }
-
-  @Override
-  public void close() throws Exception {
-for (SdkHarnessClient client : activeClients) {
-  client.close();
-}
-  }
-
-  @Override
-  public ServerServiceDefinition bindService() {
-return clientPoolService.bindService();
-  }
-}


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92772)
Time Spent: 6.5h  (was: 6h 20m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam

[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-04-11 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=90163=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-90163
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 11/Apr/18 20:47
Start Date: 11/Apr/18 20:47
Worklog Time Spent: 10m 
  Work Description: tgroh commented on issue #4761: [BEAM-3326] Remove 
SdkHarnessClientControlService
URL: https://github.com/apache/beam/pull/4761#issuecomment-380590353
 
 
   run java precommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 90163)
Time Spent: 6h 20m  (was: 6h 10m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 6h 20m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-04-11 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=90162=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-90162
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 11/Apr/18 20:47
Start Date: 11/Apr/18 20:47
Worklog Time Spent: 10m 
  Work Description: tgroh commented on issue #4761: [BEAM-3326] Remove 
SdkHarnessClientControlService
URL: https://github.com/apache/beam/pull/4761#issuecomment-380590353
 
 
   run java precommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 90162)
Time Spent: 6h 10m  (was: 6h)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 6h 10m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-04-03 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=87212=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-87212
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 03/Apr/18 19:14
Start Date: 03/Apr/18 19:14
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on issue #4761: [BEAM-3326] Remove 
SdkHarnessClientControlService
URL: https://github.com/apache/beam/pull/4761#issuecomment-378364443
 
 
   It looks like there are merge conflicts now. Is the plan still to remove 
this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 87212)
Time Spent: 6h  (was: 5h 50m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 6h
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-04-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=86805=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86805
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 02/Apr/18 22:17
Start Date: 02/Apr/18 22:17
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on issue #4761: [BEAM-3326] Remove 
SdkHarnessClientControlService
URL: https://github.com/apache/beam/pull/4761#issuecomment-378063871
 
 
   No. We can go ahead and remove this now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86805)
Time Spent: 5h 50m  (was: 5h 40m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 5h 50m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-04-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=86751=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86751
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 02/Apr/18 20:31
Start Date: 02/Apr/18 20:31
Worklog Time Spent: 10m 
  Work Description: tgroh commented on issue #4761: [BEAM-3326] Remove 
SdkHarnessClientControlService
URL: https://github.com/apache/beam/pull/4761#issuecomment-378035011
 
 
   @bsidhom Is your concern still true?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86751)
Time Spent: 5h 40m  (was: 5.5h)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 5h 40m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-03-28 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=85428=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85428
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 28/Mar/18 22:53
Start Date: 28/Mar/18 22:53
Worklog Time Spent: 10m 
  Work Description: lukecwik closed pull request #4970: [BEAM-3326] Address 
additional comments from PR/4963.
URL: https://github.com/apache/beam/pull/4970
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
index de1aa57fe32..ebcb8b4d33f 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
@@ -180,7 +180,7 @@ public String getBundleId() {
 
 /**
  * Returns a {@link FnDataReceiver receiver} which consumes input elements 
forwarding them
- * to the SDK. When
+ * to the SDK.
  */
 public FnDataReceiver getInputReceiver() {
   return inputReceiver;
@@ -211,8 +211,9 @@ public void close() throws Exception {
 if (exception == null) {
   MoreFutures.get(response);
 } else {
-  // TODO: Handle aborting the bundle being processed.
-  throw new IllegalStateException("Processing bundle failed, TODO: 
abort bundle.");
+  // TODO: [BEAM-3962] Handle aborting the bundle being processed.
+  throw new IllegalStateException("Processing bundle failed, "
+  + "TODO: [BEAM-3962] abort bundle.");
 }
   } catch (Exception e) {
 if (exception == null) {
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/FnDataService.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/FnDataService.java
index 6c7839fc193..d86324eb366 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/FnDataService.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/FnDataService.java
@@ -38,9 +38,9 @@
* The provided coder is used to decode inbound elements. The decoded 
elements are passed to
* the provided receiver.
*
-   * Any failure during decoding or processing of the element will complete 
the returned future
-   * exceptionally. On successful termination of the stream, the returned 
future is completed
-   * successfully.
+   * Any failure during decoding or processing of the element will put the
+   * {@link InboundDataClient} into an error state such that
+   * {@link InboundDataClient#awaitCompletion()} will throw an exception.
*
* The provided receiver is not required to be thread safe.
*/


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 85428)
Time Spent: 5.5h  (was: 5h 20m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 5.5h
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-03-28 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=85348=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85348
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 28/Mar/18 18:40
Start Date: 28/Mar/18 18:40
Worklog Time Spent: 10m 
  Work Description: lukecwik opened a new pull request #4970: [BEAM-3326] 
Address additional comments from PR/4963.
URL: https://github.com/apache/beam/pull/4970
 
 
   
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [ ] Make sure there is a [JIRA 
issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the 
change (usually before you start working on it).  Trivial changes like typos do 
not require a JIRA issue.  Your pull request should address just this issue, 
without pulling in other changes.
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue.
- [ ] Write a pull request description that is detailed enough to 
understand:
  - [ ] What the pull request does
  - [ ] Why it does it
  - [ ] How it does it
  - [ ] Why this approach
- [ ] Each commit in the pull request should have a meaningful subject line 
and body.
- [ ] Run `mvn clean verify` to make sure basic checks pass. A more 
thorough check will be performed on your pull request automatically.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 85348)
Time Spent: 5h 10m  (was: 5h)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-03-28 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=85349=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85349
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 28/Mar/18 18:40
Start Date: 28/Mar/18 18:40
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on issue #4970: [BEAM-3326] Address 
additional comments from PR/4963.
URL: https://github.com/apache/beam/pull/4970#issuecomment-376992745
 
 
   CC: @bsidhom 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 85349)
Time Spent: 5h 20m  (was: 5h 10m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-03-28 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=85347=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85347
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 28/Mar/18 18:35
Start Date: 28/Mar/18 18:35
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #4963: 
[BEAM-3326] Abstract away closing the inbound receiver, waiting for the bundle 
to finish, waiting for outbound to complete within the ActiveBundle.
URL: https://github.com/apache/beam/pull/4963#discussion_r177849543
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
 ##
 @@ -146,22 +154,92 @@ private BundleProcessor(
   }
 
   /** An active bundle for a particular {@link 
BeamFnApi.ProcessBundleDescriptor}. */
-  @AutoValue
-  public abstract static class ActiveBundle {
-public abstract String getBundleId();
-
-public abstract CompletionStage 
getBundleResponse();
+  public static class ActiveBundle implements AutoCloseable {
+private final String bundleId;
+private final CompletionStage response;
+private final CloseableFnDataReceiver inputReceiver;
+private final Map outputClients;
 
-public abstract CloseableFnDataReceiver 
getInputReceiver();
-public abstract Map 
getOutputClients();
-
-public static  ActiveBundle create(
+private ActiveBundle(
 String bundleId,
 CompletionStage response,
-CloseableFnDataReceiver dataReceiver,
+CloseableFnDataReceiver inputReceiver,
 Map outputClients) {
-  return new AutoValue_SdkHarnessClient_ActiveBundle<>(
-  bundleId, response, dataReceiver, outputClients);
+  this.bundleId = bundleId;
+  this.response = response;
+  this.inputReceiver = inputReceiver;
+  this.outputClients = outputClients;
+}
+
+/**
+ * Returns an id used to represent this bundle.
+ */
+public String getBundleId() {
+  return bundleId;
+}
+
+/**
+ * Returns a {@link FnDataReceiver receiver} which consumes input elements 
forwarding them
+ * to the SDK. When
+ */
+public FnDataReceiver getInputReceiver() {
+  return inputReceiver;
+}
+
+/**
+ * Blocks till bundle processing is finished. This is comprised of:
+ * 
+ *   closing the {@link #getInputReceiver() input receiver}.
+ *   waiting for the SDK to say that processing the bundle is 
finished.
+ *   waiting for all inbound data clients to complete
+ * 
+ *
+ * This method will throw an exception if bundle processing has failed.
+ * {@link Throwable#getSuppressed()} will return all the reasons as to why 
processing has
+ * failed.
+ */
+@Override
+public void close() throws Exception {
+  Exception exception = null;
+  try {
+inputReceiver.close();
+  } catch (Exception e) {
+exception = e;
+  }
+  try {
+// We don't have to worry about the completion stage.
+if (exception == null) {
+  MoreFutures.get(response);
+} else {
+  // TODO: Handle aborting the bundle being processed.
+  throw new IllegalStateException("Processing bundle failed, TODO: 
abort bundle.");
+}
+  } catch (Exception e) {
+if (exception == null) {
+  exception = e;
+} else {
+  exception.addSuppressed(e);
+}
+  }
+  for (InboundDataClient outputClient : outputClients.values()) {
+try {
+  // If we failed processing this bundle, we should cancel all inbound 
data.
+  if (exception == null) {
+outputClient.awaitCompletion();
 
 Review comment:
   We await completion on all outbound clients.
   
   No, the bundle result represents to the best knowledge that the SDK did 
everything it needed to. There can still be wire transfer/network/decoding 
failures that the SDK wouldn't be aware of after sending bundle process 
completion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 85347)
Time Spent: 5h  (was: 4h 50m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: 

[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-03-28 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=85310=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85310
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 28/Mar/18 15:59
Start Date: 28/Mar/18 15:59
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #4963: 
[BEAM-3326] Abstract away closing the inbound receiver, waiting for the bundle 
to finish, waiting for outbound to complete within the ActiveBundle.
URL: https://github.com/apache/beam/pull/4963#discussion_r177799450
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
 ##
 @@ -146,22 +154,92 @@ private BundleProcessor(
   }
 
   /** An active bundle for a particular {@link 
BeamFnApi.ProcessBundleDescriptor}. */
-  @AutoValue
-  public abstract static class ActiveBundle {
-public abstract String getBundleId();
-
-public abstract CompletionStage 
getBundleResponse();
+  public static class ActiveBundle implements AutoCloseable {
+private final String bundleId;
+private final CompletionStage response;
+private final CloseableFnDataReceiver inputReceiver;
+private final Map outputClients;
 
-public abstract CloseableFnDataReceiver 
getInputReceiver();
-public abstract Map 
getOutputClients();
-
-public static  ActiveBundle create(
+private ActiveBundle(
 String bundleId,
 CompletionStage response,
-CloseableFnDataReceiver dataReceiver,
+CloseableFnDataReceiver inputReceiver,
 Map outputClients) {
-  return new AutoValue_SdkHarnessClient_ActiveBundle<>(
-  bundleId, response, dataReceiver, outputClients);
+  this.bundleId = bundleId;
+  this.response = response;
+  this.inputReceiver = inputReceiver;
+  this.outputClients = outputClients;
+}
+
+/**
+ * Returns an id used to represent this bundle.
+ */
+public String getBundleId() {
+  return bundleId;
+}
+
+/**
+ * Returns a {@link FnDataReceiver receiver} which consumes input elements 
forwarding them
+ * to the SDK. When
+ */
+public FnDataReceiver getInputReceiver() {
+  return inputReceiver;
+}
+
+/**
+ * Blocks till bundle processing is finished. This is comprised of:
+ * 
+ *   closing the {@link #getInputReceiver() input receiver}.
+ *   waiting for the SDK to say that processing the bundle is 
finished.
+ *   waiting for all inbound data clients to complete
+ * 
+ *
+ * This method will throw an exception if bundle processing has failed.
+ * {@link Throwable#getSuppressed()} will return all the reasons as to why 
processing has
+ * failed.
+ */
+@Override
+public void close() throws Exception {
+  Exception exception = null;
+  try {
+inputReceiver.close();
+  } catch (Exception e) {
+exception = e;
+  }
+  try {
+// We don't have to worry about the completion stage.
+if (exception == null) {
+  MoreFutures.get(response);
+} else {
+  // TODO: Handle aborting the bundle being processed.
 
 Review comment:
   Could you reference a tracking bug?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 85310)
Time Spent: 4h 40m  (was: 4.5h)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-03-28 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=85311=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85311
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 28/Mar/18 15:59
Start Date: 28/Mar/18 15:59
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #4963: 
[BEAM-3326] Abstract away closing the inbound receiver, waiting for the bundle 
to finish, waiting for outbound to complete within the ActiveBundle.
URL: https://github.com/apache/beam/pull/4963#discussion_r177798196
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
 ##
 @@ -146,22 +154,92 @@ private BundleProcessor(
   }
 
   /** An active bundle for a particular {@link 
BeamFnApi.ProcessBundleDescriptor}. */
-  @AutoValue
-  public abstract static class ActiveBundle {
-public abstract String getBundleId();
-
-public abstract CompletionStage 
getBundleResponse();
+  public static class ActiveBundle implements AutoCloseable {
+private final String bundleId;
+private final CompletionStage response;
+private final CloseableFnDataReceiver inputReceiver;
+private final Map outputClients;
 
-public abstract CloseableFnDataReceiver 
getInputReceiver();
-public abstract Map 
getOutputClients();
-
-public static  ActiveBundle create(
+private ActiveBundle(
 String bundleId,
 CompletionStage response,
-CloseableFnDataReceiver dataReceiver,
+CloseableFnDataReceiver inputReceiver,
 Map outputClients) {
-  return new AutoValue_SdkHarnessClient_ActiveBundle<>(
-  bundleId, response, dataReceiver, outputClients);
+  this.bundleId = bundleId;
+  this.response = response;
+  this.inputReceiver = inputReceiver;
+  this.outputClients = outputClients;
+}
+
+/**
+ * Returns an id used to represent this bundle.
+ */
+public String getBundleId() {
+  return bundleId;
+}
+
+/**
+ * Returns a {@link FnDataReceiver receiver} which consumes input elements 
forwarding them
+ * to the SDK. When
 
 Review comment:
   Did you mean to complete this comment?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 85311)
Time Spent: 4h 50m  (was: 4h 40m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-03-28 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=85309=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85309
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 28/Mar/18 15:59
Start Date: 28/Mar/18 15:59
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #4963: 
[BEAM-3326] Abstract away closing the inbound receiver, waiting for the bundle 
to finish, waiting for outbound to complete within the ActiveBundle.
URL: https://github.com/apache/beam/pull/4963#discussion_r177800576
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
 ##
 @@ -146,22 +154,92 @@ private BundleProcessor(
   }
 
   /** An active bundle for a particular {@link 
BeamFnApi.ProcessBundleDescriptor}. */
-  @AutoValue
-  public abstract static class ActiveBundle {
-public abstract String getBundleId();
-
-public abstract CompletionStage 
getBundleResponse();
+  public static class ActiveBundle implements AutoCloseable {
+private final String bundleId;
+private final CompletionStage response;
+private final CloseableFnDataReceiver inputReceiver;
+private final Map outputClients;
 
-public abstract CloseableFnDataReceiver 
getInputReceiver();
-public abstract Map 
getOutputClients();
-
-public static  ActiveBundle create(
+private ActiveBundle(
 String bundleId,
 CompletionStage response,
-CloseableFnDataReceiver dataReceiver,
+CloseableFnDataReceiver inputReceiver,
 Map outputClients) {
-  return new AutoValue_SdkHarnessClient_ActiveBundle<>(
-  bundleId, response, dataReceiver, outputClients);
+  this.bundleId = bundleId;
+  this.response = response;
+  this.inputReceiver = inputReceiver;
+  this.outputClients = outputClients;
+}
+
+/**
+ * Returns an id used to represent this bundle.
+ */
+public String getBundleId() {
+  return bundleId;
+}
+
+/**
+ * Returns a {@link FnDataReceiver receiver} which consumes input elements 
forwarding them
+ * to the SDK. When
+ */
+public FnDataReceiver getInputReceiver() {
+  return inputReceiver;
+}
+
+/**
+ * Blocks till bundle processing is finished. This is comprised of:
+ * 
+ *   closing the {@link #getInputReceiver() input receiver}.
+ *   waiting for the SDK to say that processing the bundle is 
finished.
+ *   waiting for all inbound data clients to complete
+ * 
+ *
+ * This method will throw an exception if bundle processing has failed.
+ * {@link Throwable#getSuppressed()} will return all the reasons as to why 
processing has
+ * failed.
+ */
+@Override
+public void close() throws Exception {
+  Exception exception = null;
+  try {
+inputReceiver.close();
+  } catch (Exception e) {
+exception = e;
+  }
+  try {
+// We don't have to worry about the completion stage.
+if (exception == null) {
+  MoreFutures.get(response);
+} else {
+  // TODO: Handle aborting the bundle being processed.
+  throw new IllegalStateException("Processing bundle failed, TODO: 
abort bundle.");
+}
+  } catch (Exception e) {
+if (exception == null) {
+  exception = e;
+} else {
+  exception.addSuppressed(e);
+}
+  }
+  for (InboundDataClient outputClient : outputClients.values()) {
+try {
+  // If we failed processing this bundle, we should cancel all inbound 
data.
+  if (exception == null) {
+outputClient.awaitCompletion();
 
 Review comment:
   Could the failure of a single inbound client cause other clients to fail? If 
so, would this be captured by the overall bundle result? If not, you will need 
to wait for completion of the first-finished result rather than an arbitrary 
client here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 85309)
Time Spent: 4.5h  (was: 4h 20m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: 

[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-03-28 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=85299=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85299
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 28/Mar/18 15:44
Start Date: 28/Mar/18 15:44
Worklog Time Spent: 10m 
  Work Description: lukecwik closed pull request #4963: [BEAM-3326] 
Abstract away closing the inbound receiver, waiting for the bundle to finish, 
waiting for outbound to complete within the ActiveBundle.
URL: https://github.com/apache/beam/pull/4963
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
index f71616a9aae..de1aa57fe32 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
@@ -38,6 +38,7 @@
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.fn.data.InboundDataClient;
 import org.apache.beam.sdk.fn.data.LogicalEndpoint;
+import org.apache.beam.sdk.util.MoreFutures;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -96,6 +97,13 @@ private BundleProcessor(
  *
  * The input channels for the returned {@link ActiveBundle} are derived 
from the instructions
  * in the {@link BeamFnApi.ProcessBundleDescriptor}.
+ *
+ * NOTE: It is important to {@link #close()} each bundle after all 
elements are emitted.
+ * {@code
+ * try (ActiveBundle bundle = SdkHarnessClient.newBundle(...)) {
+ *   // send all elements
+ * }
+ * }
  */
 public ActiveBundle newBundle(
 Map outputReceivers) {
@@ -133,7 +141,7 @@ private BundleProcessor(
   fnApiDataService.send(
   LogicalEndpoint.of(bundleId, remoteInput.getTarget()), 
remoteInput.getCoder());
 
-  return ActiveBundle.create(bundleId, specificResponse, dataReceiver, 
outputClients);
+  return new ActiveBundle(bundleId, specificResponse, dataReceiver, 
outputClients);
 }
 
 private  InboundDataClient attachReceiver(
@@ -146,22 +154,92 @@ private BundleProcessor(
   }
 
   /** An active bundle for a particular {@link 
BeamFnApi.ProcessBundleDescriptor}. */
-  @AutoValue
-  public abstract static class ActiveBundle {
-public abstract String getBundleId();
-
-public abstract CompletionStage 
getBundleResponse();
+  public static class ActiveBundle implements AutoCloseable {
+private final String bundleId;
+private final CompletionStage response;
+private final CloseableFnDataReceiver inputReceiver;
+private final Map outputClients;
 
-public abstract CloseableFnDataReceiver 
getInputReceiver();
-public abstract Map 
getOutputClients();
-
-public static  ActiveBundle create(
+private ActiveBundle(
 String bundleId,
 CompletionStage response,
-CloseableFnDataReceiver dataReceiver,
+CloseableFnDataReceiver inputReceiver,
 Map outputClients) {
-  return new AutoValue_SdkHarnessClient_ActiveBundle<>(
-  bundleId, response, dataReceiver, outputClients);
+  this.bundleId = bundleId;
+  this.response = response;
+  this.inputReceiver = inputReceiver;
+  this.outputClients = outputClients;
+}
+
+/**
+ * Returns an id used to represent this bundle.
+ */
+public String getBundleId() {
+  return bundleId;
+}
+
+/**
+ * Returns a {@link FnDataReceiver receiver} which consumes input elements 
forwarding them
+ * to the SDK. When
+ */
+public FnDataReceiver getInputReceiver() {
+  return inputReceiver;
+}
+
+/**
+ * Blocks till bundle processing is finished. This is comprised of:
+ * 
+ *   closing the {@link #getInputReceiver() input receiver}.
+ *   waiting for the SDK to say that processing the bundle is 
finished.
+ *   waiting for all inbound data clients to complete
+ * 
+ *
+ * This method will throw an exception if bundle processing has failed.
+ * {@link Throwable#getSuppressed()} will return all the reasons as to why 
processing has
+ * failed.
+ */
+   

[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-03-28 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=85298=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85298
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 28/Mar/18 15:43
Start Date: 28/Mar/18 15:43
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #4963: 
[BEAM-3326] Abstract away closing the inbound receiver, waiting for the bundle 
to finish, waiting for outbound to complete within the ActiveBundle.
URL: https://github.com/apache/beam/pull/4963#discussion_r177795953
 
 

 ##
 File path: 
runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
 ##
 @@ -221,6 +220,135 @@ public void testNewBundleAndProcessElements() throws 
Exception {
 WindowedValue.valueInGlobalWindow("eggs")));
   }
 
+  @Test
+  public void handleCleanupWhenInputSenderFails() throws Exception {
+String descriptorId1 = "descriptor1";
+Exception testException = new Exception();
+
+InboundDataClient mockOutputReceiver = mock(InboundDataClient.class);
+CloseableFnDataReceiver mockInputSender = 
mock(CloseableFnDataReceiver.class);
+
+ProcessBundleDescriptor descriptor =
+ProcessBundleDescriptor.newBuilder().setId(descriptorId1).build();
+CompletableFuture processBundleResponseFuture =
+new CompletableFuture<>();
+when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class)))
+.thenReturn(new CompletableFuture<>())
+.thenReturn(processBundleResponseFuture);
+
+FullWindowedValueCoder coder =
+FullWindowedValueCoder.of(StringUtf8Coder.of(), Coder.INSTANCE);
+BundleProcessor processor =
+sdkHarnessClient.getProcessor(
+descriptor, RemoteInputDestination.of(coder, 
Target.getDefaultInstance()));
+when(dataService.receive(any(), any(), 
any())).thenReturn(mockOutputReceiver);
 
 Review comment:
   I'll fix the comment in a separate PR and merge this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 85298)
Time Spent: 4h 10m  (was: 4h)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-03-28 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=85297=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85297
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 28/Mar/18 15:42
Start Date: 28/Mar/18 15:42
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on a change in pull request #4963: 
[BEAM-3326] Abstract away closing the inbound receiver, waiting for the bundle 
to finish, waiting for outbound to complete within the ActiveBundle.
URL: https://github.com/apache/beam/pull/4963#discussion_r177795953
 
 

 ##
 File path: 
runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
 ##
 @@ -221,6 +220,135 @@ public void testNewBundleAndProcessElements() throws 
Exception {
 WindowedValue.valueInGlobalWindow("eggs")));
   }
 
+  @Test
+  public void handleCleanupWhenInputSenderFails() throws Exception {
+String descriptorId1 = "descriptor1";
+Exception testException = new Exception();
+
+InboundDataClient mockOutputReceiver = mock(InboundDataClient.class);
+CloseableFnDataReceiver mockInputSender = 
mock(CloseableFnDataReceiver.class);
+
+ProcessBundleDescriptor descriptor =
+ProcessBundleDescriptor.newBuilder().setId(descriptorId1).build();
+CompletableFuture processBundleResponseFuture =
+new CompletableFuture<>();
+when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class)))
+.thenReturn(new CompletableFuture<>())
+.thenReturn(processBundleResponseFuture);
+
+FullWindowedValueCoder coder =
+FullWindowedValueCoder.of(StringUtf8Coder.of(), Coder.INSTANCE);
+BundleProcessor processor =
+sdkHarnessClient.getProcessor(
+descriptor, RemoteInputDestination.of(coder, 
Target.getDefaultInstance()));
+when(dataService.receive(any(), any(), 
any())).thenReturn(mockOutputReceiver);
 
 Review comment:
   I'll fix the comment and merge.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 85297)
Time Spent: 4h  (was: 3h 50m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 4h
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-03-28 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=85182=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85182
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 28/Mar/18 09:59
Start Date: 28/Mar/18 09:59
Worklog Time Spent: 10m 
  Work Description: aljoscha commented on a change in pull request #4963: 
[BEAM-3326] Abstract away closing the inbound receiver, waiting for the bundle 
to finish, waiting for outbound to complete within the ActiveBundle.
URL: https://github.com/apache/beam/pull/4963#discussion_r177666906
 
 

 ##
 File path: 
runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
 ##
 @@ -221,6 +220,135 @@ public void testNewBundleAndProcessElements() throws 
Exception {
 WindowedValue.valueInGlobalWindow("eggs")));
   }
 
+  @Test
+  public void handleCleanupWhenInputSenderFails() throws Exception {
+String descriptorId1 = "descriptor1";
+Exception testException = new Exception();
+
+InboundDataClient mockOutputReceiver = mock(InboundDataClient.class);
+CloseableFnDataReceiver mockInputSender = 
mock(CloseableFnDataReceiver.class);
+
+ProcessBundleDescriptor descriptor =
+ProcessBundleDescriptor.newBuilder().setId(descriptorId1).build();
+CompletableFuture processBundleResponseFuture =
+new CompletableFuture<>();
+when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class)))
+.thenReturn(new CompletableFuture<>())
+.thenReturn(processBundleResponseFuture);
+
+FullWindowedValueCoder coder =
+FullWindowedValueCoder.of(StringUtf8Coder.of(), Coder.INSTANCE);
+BundleProcessor processor =
+sdkHarnessClient.getProcessor(
+descriptor, RemoteInputDestination.of(coder, 
Target.getDefaultInstance()));
+when(dataService.receive(any(), any(), 
any())).thenReturn(mockOutputReceiver);
 
 Review comment:
   off-topic: I noticed that `FnDataService.receive()` still refers to the 
"returned future" in the Javadoc even though it doesn't return a future anymore.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 85182)
Time Spent: 3h 50m  (was: 3h 40m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-03-27 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=85098=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85098
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 28/Mar/18 01:41
Start Date: 28/Mar/18 01:41
Worklog Time Spent: 10m 
  Work Description: lukecwik opened a new pull request #4963: [BEAM-3326] 
Abstract away closing the inbound receiver, waiting for the bundle to finish, 
waiting for outbound to complete within the ActiveBundle.
URL: https://github.com/apache/beam/pull/4963
 
 
   
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [ ] Make sure there is a [JIRA 
issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the 
change (usually before you start working on it).  Trivial changes like typos do 
not require a JIRA issue.  Your pull request should address just this issue, 
without pulling in other changes.
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue.
- [ ] Write a pull request description that is detailed enough to 
understand:
  - [ ] What the pull request does
  - [ ] Why it does it
  - [ ] How it does it
  - [ ] Why this approach
- [ ] Each commit in the pull request should have a meaningful subject line 
and body.
- [ ] Run `mvn clean verify` to make sure basic checks pass. A more 
thorough check will be performed on your pull request automatically.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 85098)
Time Spent: 3.5h  (was: 3h 20m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-03-27 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=85099=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85099
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 28/Mar/18 01:41
Start Date: 28/Mar/18 01:41
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on issue #4963: [BEAM-3326] Abstract 
away closing the inbound receiver, waiting for the bundle to finish, waiting 
for outbound to complete within the ActiveBundle.
URL: https://github.com/apache/beam/pull/4963#issuecomment-376731599
 
 
   CC: @bsidhom 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 85099)
Time Spent: 3h 40m  (was: 3.5h)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-03-15 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=80905=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80905
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 15/Mar/18 18:00
Start Date: 15/Mar/18 18:00
Worklog Time Spent: 10m 
  Work Description: tgroh commented on issue #4761: [BEAM-3326] Remove 
SdkHarnessClientControlService
URL: https://github.com/apache/beam/pull/4761#issuecomment-373469231
 
 
   Assuming the changes in https://github.com/apache/beam/pull/4866 are 
accepted, this likely becomes a no-op including when run against flink, because 
it removes the need for the `EnvironmentManager` to directly access data or 
state services.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 80905)
Time Spent: 3h 20m  (was: 3h 10m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-03-14 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=80533=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80533
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 14/Mar/18 20:30
Start Date: 14/Mar/18 20:30
Worklog Time Spent: 10m 
  Work Description: lukecwik closed pull request #4825: [BEAM-3326] Add an 
InProcess SdkHarness Rule
URL: https://github.com/apache/beam/pull/4825
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java
index 9d443427dc1..5a82a32a0b4 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java
@@ -18,7 +18,10 @@
 package org.apache.beam.runners.fnexecution.control;
 
 import io.grpc.stub.StreamObserver;
+import java.util.Collection;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
 import org.apache.beam.runners.fnexecution.FnService;
@@ -31,6 +34,8 @@
   private static final Logger LOGGER = 
LoggerFactory.getLogger(FnApiControlClientPoolService.class);
 
   private final BlockingQueue clientPool;
+  private final Collection vendedClients = new 
CopyOnWriteArrayList<>();
+  private AtomicBoolean closed = new AtomicBoolean();
 
   private FnApiControlClientPoolService(BlockingQueue 
clientPool) {
 this.clientPool = clientPool;
@@ -61,6 +66,12 @@ public static FnApiControlClientPoolService 
offeringClientsToPool(
 LOGGER.info("Beam Fn Control client connected.");
 FnApiControlClient newClient = 
FnApiControlClient.forRequestObserver(requestObserver);
 try {
+  // Add the client to the pool of vended clients before making it 
available - we should close
+  // the client when we close even if no one has picked it up yet. This 
can occur after the
+  // service is closed, in which case the client will be discarded when 
the service is
+  // discarded, which should be performed by a call to #shutdownNow. The 
remote caller must be
+  // able to handle an unexpectedly terminated connection.
+  vendedClients.add(newClient);
   clientPool.put(newClient);
 } catch (InterruptedException e) {
   Thread.currentThread().interrupt();
@@ -70,7 +81,11 @@ public static FnApiControlClientPoolService 
offeringClientsToPool(
   }
 
   @Override
-  public void close() throws Exception {
-// The clients in the pool are owned by the consumer, which is responsible 
for closing them
+  public void close() {
+if (!closed.getAndSet(true)) {
+  for (FnApiControlClient vended : vendedClients) {
+vended.close();
+  }
+}
   }
 }
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/GrpcDataService.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/GrpcDataService.java
index 00b2a9afb92..a13d3d7d38c 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/GrpcDataService.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/GrpcDataService.java
@@ -53,8 +53,7 @@
 implements FnService, FnDataService {
   private static final Logger LOG = 
LoggerFactory.getLogger(GrpcDataService.class);
 
-  public static GrpcDataService create(
-  ExecutorService executor) {
+  public static GrpcDataService create(ExecutorService executor) {
 return new GrpcDataService(executor);
   }
 
@@ -62,6 +61,9 @@ public static GrpcDataService create(
   /**
* A collection of multiplexers which are not used to send data. A handle to 
these multiplexers is
* maintained in order to perform an orderly shutdown.
+   *
+   * TODO: (BEAM-3811) Replace with some cancellable collection, to ensure 
that new clients of a
+   * closed {@link GrpcDataService} are closed with that {@link 
GrpcDataService}.
*/
   private final Queue additionalMultiplexers;
 
@@ -105,7 +107,9 @@ public void close() throws Exception {
 // Shutdown remaining clients
   }
 }
-connectedClient.get().close();
+if (!connectedClient.isCancelled()) {
+  

[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-03-13 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=80170=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80170
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 14/Mar/18 02:12
Start Date: 14/Mar/18 02:12
Worklog Time Spent: 10m 
  Work Description: tgroh commented on issue #4825: [BEAM-3326] Add an 
InProcess SdkHarness Rule
URL: https://github.com/apache/beam/pull/4825#issuecomment-372881433
 
 
   run java gradle precommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 80170)
Time Spent: 3h  (was: 2h 50m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-03-13 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=80169=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80169
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 14/Mar/18 02:12
Start Date: 14/Mar/18 02:12
Worklog Time Spent: 10m 
  Work Description: tgroh commented on issue #4825: [BEAM-3326] Add an 
InProcess SdkHarness Rule
URL: https://github.com/apache/beam/pull/4825#issuecomment-372881433
 
 
   run java gradle precommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 80169)
Time Spent: 2h 50m  (was: 2h 40m)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner

2018-03-12 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=79766=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79766
 ]

ASF GitHub Bot logged work on BEAM-3326:


Author: ASF GitHub Bot
Created on: 13/Mar/18 04:47
Start Date: 13/Mar/18 04:47
Worklog Time Spent: 10m 
  Work Description: tgroh commented on issue #4825: [BEAM-3326] Add an 
InProcess SdkHarness Rule
URL: https://github.com/apache/beam/pull/4825#issuecomment-372545825
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 79766)
Time Spent: 2h 40m  (was: 2.5h)

> Execute a Stage via the portability framework in the ReferenceRunner
> 
>
> Key: BEAM-3326
> URL: https://issues.apache.org/jira/browse/BEAM-3326
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)