[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=107345=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-107345 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 30/May/18 19:48 Start Date: 30/May/18 19:48 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5349: [BEAM-3326] Remote stage evaluator URL: https://github.com/apache/beam/pull/5349#discussion_r191900128 ## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/InProcessEnvironmentFactory.java ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct.portable; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.time.Duration; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.beam.fn.harness.FnHarness; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.control.ControlClientPool; +import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; +import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory; +import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment; +import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService; +import org.apache.beam.sdk.fn.stream.StreamObserverFactory; +import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory; +import org.apache.beam.sdk.options.PipelineOptionsFactory; + +/** + * An {@link EnvironmentFactory} that communicates to a {@link FnHarness} via the in-process gRPC + * channel. + * + * TODO: Move this class to the runners/java-fn-execution module, with the Java SDK Harness as a + * provided dependency. + */ +class InProcessEnvironmentFactory implements EnvironmentFactory { + + private final GrpcFnServer loggingServer; + private final GrpcFnServer controlServer; + + private final ControlClientPool.Source clientSource; + + InProcessEnvironmentFactory( + GrpcFnServer loggingServer, + GrpcFnServer controlServer, + ControlClientPool.Source clientSource) { +this.loggingServer = loggingServer; +this.controlServer = controlServer; +checkArgument( +loggingServer.getApiServiceDescriptor() != null, +"Logging Server cannot have a null %s", +ApiServiceDescriptor.class.getSimpleName()); +checkArgument( +controlServer.getApiServiceDescriptor() != null, +"Control Server cannot have a null %s", +ApiServiceDescriptor.class.getSimpleName()); +this.clientSource = clientSource; + } + + @Override + public RemoteEnvironment createEnvironment(Environment container) throws Exception { +ExecutorService executor = Executors.newSingleThreadExecutor(); +Future fnHarness = +executor.submit( +() -> +FnHarness.main( +PipelineOptionsFactory.create(), +loggingServer.getApiServiceDescriptor(), +controlServer.getApiServiceDescriptor(), +InProcessManagedChannelFactory.create(), +StreamObserverFactory.direct())); +executor.submit( +() -> { + try { +fnHarness.get(); + } catch (Throwable t) { +executor.shutdownNow(); + } +}); + +// TODO: find some way to populate the actual ID in FnHarness.main() Review comment: +1, we should enhance `ApiServiceDescriptor` to hold connection information. I am in favor of adding `headers` in `ApiServiceDescriptor` instead of having a `client_id`. This is an automated message from the Apache Git Service. To
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=105653=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105653 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 24/May/18 16:56 Start Date: 24/May/18 16:56 Worklog Time Spent: 10m Work Description: tgroh closed pull request #5349: [BEAM-3326] Remote stage evaluator URL: https://github.com/apache/beam/pull/5349 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/direct-java/build.gradle b/runners/direct-java/build.gradle index 5fc4e159520..af99b5b5bc2 100644 --- a/runners/direct-java/build.gradle +++ b/runners/direct-java/build.gradle @@ -64,6 +64,7 @@ dependencies { shadow library.java.slf4j_api shadow library.java.hamcrest_core shadow library.java.junit + testRuntime project(path: ":beam-sdks-java-harness") shadowTest project(path: ":beam-sdks-java-core", configuration: "shadowTest") shadowTest project(path: ":beam-runners-core-java", configuration: "shadowTest") shadowTest library.java.guava_testlib diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml index 1d3d790a747..d06b074246c 100644 --- a/runners/direct-java/pom.xml +++ b/runners/direct-java/pom.xml @@ -226,6 +226,12 @@ beam-sdks-java-fn-execution + + org.apache.beam + beam-sdks-java-harness + provided + + org.apache.beam beam-runners-java-fn-execution diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory.java new file mode 100644 index 000..f34c4d54d49 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct.portable; + +import java.util.ArrayList; +import java.util.Collection; +import javax.annotation.Nullable; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode; +import org.apache.beam.runners.fnexecution.control.JobBundleFactory; +import org.apache.beam.runners.fnexecution.control.RemoteBundle; +import org.apache.beam.runners.fnexecution.control.StageBundleFactory; +import org.apache.beam.runners.fnexecution.state.StateRequestHandler; +import org.apache.beam.sdk.util.WindowedValue; + +/** + * The {@link TransformEvaluatorFactory} which produces {@link TransformEvaluator evaluators} for + * stages which execute on an SDK harness via the Fn Execution APIs. + */ +class RemoteStageEvaluatorFactory implements TransformEvaluatorFactory { + private final BundleFactory bundleFactory; + + private final JobBundleFactory jobFactory; + + RemoteStageEvaluatorFactory(BundleFactory bundleFactory, JobBundleFactory jobFactory) { +this.bundleFactory = bundleFactory; +this.jobFactory = jobFactory; + } + + @Nullable + @Override + public TransformEvaluator forApplication( + PTransformNode application, CommittedBundle inputBundle) throws Exception { +return new RemoteStageEvaluator<>(application); + } + + @Override + public void cleanup() throws Exception { +jobFactory.close(); + } + + private class RemoteStageEvaluator implements TransformEvaluator { +private final PTransformNode transform; +private final RemoteBundle bundle; +private final Collectionoutputs; + +private RemoteStageEvaluator(PTransformNode transform) throws Exception { + this.transform = transform; + ExecutableStage stage = + ExecutableStage.fromPayload( +
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=105616=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105616 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 24/May/18 15:15 Start Date: 24/May/18 15:15 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5349: [BEAM-3326] Remote stage evaluator URL: https://github.com/apache/beam/pull/5349#discussion_r190624823 ## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/InProcessEnvironmentFactory.java ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct.portable; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.time.Duration; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.beam.fn.harness.FnHarness; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.control.ControlClientPool; +import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; +import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory; +import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment; +import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService; +import org.apache.beam.sdk.fn.stream.StreamObserverFactory; +import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory; +import org.apache.beam.sdk.options.PipelineOptionsFactory; + +/** + * An {@link EnvironmentFactory} that communicates to a {@link FnHarness} via the in-process gRPC + * channel. + * + * TODO: Move this class to the runners/java-fn-execution module, with the Java SDK Harness as a + * provided dependency. + */ +class InProcessEnvironmentFactory implements EnvironmentFactory { + + private final GrpcFnServer loggingServer; + private final GrpcFnServer controlServer; + + private final ControlClientPool.Source clientSource; + + InProcessEnvironmentFactory( + GrpcFnServer loggingServer, + GrpcFnServer controlServer, + ControlClientPool.Source clientSource) { +this.loggingServer = loggingServer; +this.controlServer = controlServer; +checkArgument( +loggingServer.getApiServiceDescriptor() != null, +"Logging Server cannot have a null %s", +ApiServiceDescriptor.class.getSimpleName()); +checkArgument( +controlServer.getApiServiceDescriptor() != null, +"Control Server cannot have a null %s", +ApiServiceDescriptor.class.getSimpleName()); +this.clientSource = clientSource; + } + + @Override + public RemoteEnvironment createEnvironment(Environment container) throws Exception { +ExecutorService executor = Executors.newSingleThreadExecutor(); +Future fnHarness = +executor.submit( +() -> +FnHarness.main( +PipelineOptionsFactory.create(), +loggingServer.getApiServiceDescriptor(), +controlServer.getApiServiceDescriptor(), +InProcessManagedChannelFactory.create(), +StreamObserverFactory.direct())); +executor.submit( +() -> { + try { +fnHarness.get(); + } catch (Throwable t) { +executor.shutdownNow(); + } +}); + +// TODO: find some way to populate the actual ID in FnHarness.main() Review comment: I would be in favor of dropping how we currently provide the client id in exchange for this approach unless your referring to something else when you say `the same value in two different ways` This is an automated message from the Apache
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=105413=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105413 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 23/May/18 23:47 Start Date: 23/May/18 23:47 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5349: [BEAM-3326] Remote stage evaluator URL: https://github.com/apache/beam/pull/5349#discussion_r190431619 ## File path: runners/java-fn-execution/build.gradle ## @@ -24,6 +24,7 @@ dependencies { compile library.java.guava compile library.java.findbugs_annotations compile project(path: ":beam-runners-core-construction-java", configuration: "shadow") + compileOnly project(path: ":beam-sdks-java-harness") Review comment: Done. I can't find any documentation at all about the `provided` dependency scope in Gradle - every time I google it it sends me to `compileOnly`. This is a result of our `build_rules.gradle`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 105413) Time Spent: 15h 20m (was: 15h 10m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 15h 20m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=105406=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105406 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 23/May/18 23:20 Start Date: 23/May/18 23:20 Worklog Time Spent: 10m Work Description: herohde commented on a change in pull request #5349: [BEAM-3326] Remote stage evaluator URL: https://github.com/apache/beam/pull/5349#discussion_r190427412 ## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/InProcessEnvironmentFactory.java ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct.portable; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.time.Duration; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.beam.fn.harness.FnHarness; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.control.ControlClientPool; +import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; +import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory; +import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment; +import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService; +import org.apache.beam.sdk.fn.stream.StreamObserverFactory; +import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory; +import org.apache.beam.sdk.options.PipelineOptionsFactory; + +/** + * An {@link EnvironmentFactory} that communicates to a {@link FnHarness} via the in-process gRPC + * channel. + * + * TODO: Move this class to the runners/java-fn-execution module, with the Java SDK Harness as a + * provided dependency. + */ +class InProcessEnvironmentFactory implements EnvironmentFactory { + + private final GrpcFnServer loggingServer; + private final GrpcFnServer controlServer; + + private final ControlClientPool.Source clientSource; + + InProcessEnvironmentFactory( + GrpcFnServer loggingServer, + GrpcFnServer controlServer, + ControlClientPool.Source clientSource) { +this.loggingServer = loggingServer; +this.controlServer = controlServer; +checkArgument( +loggingServer.getApiServiceDescriptor() != null, +"Logging Server cannot have a null %s", +ApiServiceDescriptor.class.getSimpleName()); +checkArgument( +controlServer.getApiServiceDescriptor() != null, +"Control Server cannot have a null %s", +ApiServiceDescriptor.class.getSimpleName()); +this.clientSource = clientSource; + } + + @Override + public RemoteEnvironment createEnvironment(Environment container) throws Exception { +ExecutorService executor = Executors.newSingleThreadExecutor(); +Future fnHarness = +executor.submit( +() -> +FnHarness.main( +PipelineOptionsFactory.create(), +loggingServer.getApiServiceDescriptor(), +controlServer.getApiServiceDescriptor(), +InProcessManagedChannelFactory.create(), +StreamObserverFactory.direct())); +executor.submit( +() -> { + try { +fnHarness.get(); + } catch (Throwable t) { +executor.shutdownNow(); + } +}); + +// TODO: find some way to populate the actual ID in FnHarness.main() Review comment: I see what you're saying. It makes sense, but I'm not in favor of providing the same value in two different ways. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=105403=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105403 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 23/May/18 23:11 Start Date: 23/May/18 23:11 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5349: [BEAM-3326] Remote stage evaluator URL: https://github.com/apache/beam/pull/5349#discussion_r190425499 ## File path: runners/direct-java/build.gradle ## @@ -64,6 +64,7 @@ dependencies { shadow library.java.slf4j_api Review comment: I still think we want to use `provided` for `beam-sdks-java-harness` so that it is correctly added to the generated maven pom. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 105403) Time Spent: 14h 50m (was: 14h 40m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 14h 50m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=105404=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105404 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 23/May/18 23:11 Start Date: 23/May/18 23:11 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5349: [BEAM-3326] Remote stage evaluator URL: https://github.com/apache/beam/pull/5349#discussion_r190425600 ## File path: runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java ## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct.portable; + +import static com.google.common.collect.Iterables.getOnlyElement; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.Iterables; +import java.io.Serializable; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.PipelineTranslation; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser; +import org.apache.beam.runners.core.construction.graph.PipelineNode; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode; +import org.apache.beam.runners.core.construction.graph.QueryablePipeline; +import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.InProcessServerFactory; +import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; +import org.apache.beam.runners.fnexecution.control.JobBundleFactory; +import org.apache.beam.runners.fnexecution.data.GrpcDataService; +import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory; +import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService; +import org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter; +import org.apache.beam.runners.fnexecution.state.GrpcStateService; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.Impulse; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.joda.time.Instant; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link RemoteStageEvaluatorFactory}. */ +@RunWith(JUnit4.class) +public class RemoteStageEvaluatorFactoryTest implements Serializable { + + private transient RemoteStageEvaluatorFactory factory; + private transient ExecutorService executor; + private transient GrpcFnServer dataServer; Review comment: Got it This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 105404) Time Spent: 14h 50m (was: 14h 40m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature >
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=105405=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105405 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 23/May/18 23:11 Start Date: 23/May/18 23:11 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5349: [BEAM-3326] Remote stage evaluator URL: https://github.com/apache/beam/pull/5349#discussion_r190425759 ## File path: runners/java-fn-execution/build.gradle ## @@ -24,6 +24,7 @@ dependencies { compile library.java.guava compile library.java.findbugs_annotations compile project(path: ":beam-runners-core-construction-java", configuration: "shadow") + compileOnly project(path: ":beam-sdks-java-harness") Review comment: Please use `provided` Don't we need a corresponding change to the pom.xml? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 105405) Time Spent: 15h (was: 14h 50m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 15h > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=105402=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105402 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 23/May/18 23:06 Start Date: 23/May/18 23:06 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5349: [BEAM-3326] Remote stage evaluator URL: https://github.com/apache/beam/pull/5349#discussion_r190425270 ## File path: runners/direct-java/build.gradle ## @@ -58,12 +58,14 @@ dependencies { compile project(path: ":beam-runners-local-java-core", configuration: "shadow") compile project(path: ":beam-runners-java-fn-execution", configuration: "shadow") compile project(path: ":beam-sdks-java-fn-execution", configuration: "shadow") + compileOnly project(path: ":beam-sdks-java-harness") Review comment: We have been using `provided` scope in other places and this gets correctly translated when producing the maven jar. Your correct in that we should migrate to `compileOnly` as the provided scope but until that happens please stick with `provided` for now. I filed https://issues.apache.org/jira/browse/BEAM-4395 for the migration to `compileOnly` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 105402) Time Spent: 14h 40m (was: 14.5h) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 14h 40m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=105400=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105400 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 23/May/18 23:01 Start Date: 23/May/18 23:01 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5349: [BEAM-3326] Remote stage evaluator URL: https://github.com/apache/beam/pull/5349#discussion_r190424525 ## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/InProcessEnvironmentFactory.java ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct.portable; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.time.Duration; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.beam.fn.harness.FnHarness; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.control.ControlClientPool; +import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; +import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory; +import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment; +import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService; +import org.apache.beam.sdk.fn.stream.StreamObserverFactory; +import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory; +import org.apache.beam.sdk.options.PipelineOptionsFactory; + +/** + * An {@link EnvironmentFactory} that communicates to a {@link FnHarness} via the in-process gRPC + * channel. + * + * TODO: Move this class to the runners/java-fn-execution module, with the Java SDK Harness as a + * provided dependency. + */ +class InProcessEnvironmentFactory implements EnvironmentFactory { + + private final GrpcFnServer loggingServer; + private final GrpcFnServer controlServer; + + private final ControlClientPool.Source clientSource; + + InProcessEnvironmentFactory( + GrpcFnServer loggingServer, + GrpcFnServer controlServer, + ControlClientPool.Source clientSource) { +this.loggingServer = loggingServer; +this.controlServer = controlServer; +checkArgument( +loggingServer.getApiServiceDescriptor() != null, +"Logging Server cannot have a null %s", +ApiServiceDescriptor.class.getSimpleName()); +checkArgument( +controlServer.getApiServiceDescriptor() != null, +"Control Server cannot have a null %s", +ApiServiceDescriptor.class.getSimpleName()); +this.clientSource = clientSource; + } + + @Override + public RemoteEnvironment createEnvironment(Environment container) throws Exception { +ExecutorService executor = Executors.newSingleThreadExecutor(); +Future fnHarness = +executor.submit( +() -> +FnHarness.main( +PipelineOptionsFactory.create(), +loggingServer.getApiServiceDescriptor(), +controlServer.getApiServiceDescriptor(), +InProcessManagedChannelFactory.create(), +StreamObserverFactory.direct())); +executor.submit( +() -> { + try { +fnHarness.get(); + } catch (Throwable t) { +executor.shutdownNow(); + } +}); + +// TODO: find some way to populate the actual ID in FnHarness.main() Review comment: The point I'm trying to make is that the ApiServiceDescriptor is meant to describe who and how your connecting to an endpoint. I can foresee that in the future you would specify things like credentials inside this ApiServiceDescriptor. Since talking to the service requires a client id, I felt as
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104927=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104927 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 23/May/18 01:47 Start Date: 23/May/18 01:47 Worklog Time Spent: 10m Work Description: herohde commented on a change in pull request #5349: [BEAM-3326] Remote stage evaluator URL: https://github.com/apache/beam/pull/5349#discussion_r190102635 ## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/InProcessEnvironmentFactory.java ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct.portable; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.time.Duration; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.beam.fn.harness.FnHarness; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.control.ControlClientPool; +import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; +import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory; +import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment; +import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService; +import org.apache.beam.sdk.fn.stream.StreamObserverFactory; +import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory; +import org.apache.beam.sdk.options.PipelineOptionsFactory; + +/** + * An {@link EnvironmentFactory} that communicates to a {@link FnHarness} via the in-process gRPC + * channel. + * + * TODO: Move this class to the runners/java-fn-execution module, with the Java SDK Harness as a + * provided dependency. + */ +class InProcessEnvironmentFactory implements EnvironmentFactory { + + private final GrpcFnServer loggingServer; + private final GrpcFnServer controlServer; + + private final ControlClientPool.Source clientSource; + + InProcessEnvironmentFactory( + GrpcFnServer loggingServer, + GrpcFnServer controlServer, + ControlClientPool.Source clientSource) { +this.loggingServer = loggingServer; +this.controlServer = controlServer; +checkArgument( +loggingServer.getApiServiceDescriptor() != null, +"Logging Server cannot have a null %s", +ApiServiceDescriptor.class.getSimpleName()); +checkArgument( +controlServer.getApiServiceDescriptor() != null, +"Control Server cannot have a null %s", +ApiServiceDescriptor.class.getSimpleName()); +this.clientSource = clientSource; + } + + @Override + public RemoteEnvironment createEnvironment(Environment container) throws Exception { +ExecutorService executor = Executors.newSingleThreadExecutor(); +Future fnHarness = +executor.submit( +() -> +FnHarness.main( +PipelineOptionsFactory.create(), +loggingServer.getApiServiceDescriptor(), +controlServer.getApiServiceDescriptor(), +InProcessManagedChannelFactory.create(), +StreamObserverFactory.direct())); +executor.submit( +() -> { + try { +fnHarness.get(); + } catch (Throwable t) { +executor.shutdownNow(); + } +}); + +// TODO: find some way to populate the actual ID in FnHarness.main() Review comment: I'm not sure I understand the context for this question. This ID is passed as part of the container contract and used for all portable API calls from that container. I think it's desirable that it's a single ID for simplicity and debuggability. Python's use of an environment variable to pass it around
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104907=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104907 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 23/May/18 00:41 Start Date: 23/May/18 00:41 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5349: [BEAM-3326] Remote stage evaluator URL: https://github.com/apache/beam/pull/5349#discussion_r190094767 ## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/InProcessEnvironmentFactory.java ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct.portable; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.time.Duration; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.beam.fn.harness.FnHarness; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.control.ControlClientPool; +import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; +import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory; +import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment; +import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService; +import org.apache.beam.sdk.fn.stream.StreamObserverFactory; +import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory; +import org.apache.beam.sdk.options.PipelineOptionsFactory; + +/** + * An {@link EnvironmentFactory} that communicates to a {@link FnHarness} via the in-process gRPC Review comment: Updated. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 104907) Time Spent: 14h (was: 13h 50m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 14h > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104906=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104906 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 23/May/18 00:41 Start Date: 23/May/18 00:41 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5349: [BEAM-3326] Remote stage evaluator URL: https://github.com/apache/beam/pull/5349#discussion_r190094759 ## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/InProcessEnvironmentFactory.java ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct.portable; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.time.Duration; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.beam.fn.harness.FnHarness; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.control.ControlClientPool; +import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; +import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory; +import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment; +import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService; +import org.apache.beam.sdk.fn.stream.StreamObserverFactory; +import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory; +import org.apache.beam.sdk.options.PipelineOptionsFactory; + +/** + * An {@link EnvironmentFactory} that communicates to a {@link FnHarness} via the in-process gRPC + * channel. + * + * TODO: Move this class to the runners/java-fn-execution module, with the Java SDK Harness as a + * provided dependency. + */ +class InProcessEnvironmentFactory implements EnvironmentFactory { + + private final GrpcFnServer loggingServer; + private final GrpcFnServer controlServer; + + private final ControlClientPool.Source clientSource; + + InProcessEnvironmentFactory( + GrpcFnServer loggingServer, + GrpcFnServer controlServer, + ControlClientPool.Source clientSource) { +this.loggingServer = loggingServer; +this.controlServer = controlServer; +checkArgument( +loggingServer.getApiServiceDescriptor() != null, +"Logging Server cannot have a null %s", +ApiServiceDescriptor.class.getSimpleName()); +checkArgument( +controlServer.getApiServiceDescriptor() != null, +"Control Server cannot have a null %s", +ApiServiceDescriptor.class.getSimpleName()); +this.clientSource = clientSource; + } + + @Override + public RemoteEnvironment createEnvironment(Environment container) throws Exception { +ExecutorService executor = Executors.newSingleThreadExecutor(); +Future fnHarness = +executor.submit( +() -> +FnHarness.main( +PipelineOptionsFactory.create(), Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 104906) Time Spent: 13h 50m (was: 13h 40m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter:
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104908=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104908 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 23/May/18 00:41 Start Date: 23/May/18 00:41 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5349: [BEAM-3326] Remote stage evaluator URL: https://github.com/apache/beam/pull/5349#discussion_r190094789 ## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/InProcessEnvironmentFactory.java ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct.portable; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.time.Duration; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.beam.fn.harness.FnHarness; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.control.ControlClientPool; +import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; +import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory; +import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment; +import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService; +import org.apache.beam.sdk.fn.stream.StreamObserverFactory; +import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory; +import org.apache.beam.sdk.options.PipelineOptionsFactory; + +/** + * An {@link EnvironmentFactory} that communicates to a {@link FnHarness} via the in-process gRPC + * channel. + * + * TODO: Move this class to the runners/java-fn-execution module, with the Java SDK Harness as a Review comment: Nope, done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 104908) Time Spent: 14h 10m (was: 14h) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 14h 10m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104905=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104905 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 23/May/18 00:40 Start Date: 23/May/18 00:40 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5349: [BEAM-3326] Remote stage evaluator URL: https://github.com/apache/beam/pull/5349#discussion_r190094746 ## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/InProcessEnvironmentFactory.java ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct.portable; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.time.Duration; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.beam.fn.harness.FnHarness; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.control.ControlClientPool; +import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; +import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory; +import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment; +import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService; +import org.apache.beam.sdk.fn.stream.StreamObserverFactory; +import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory; +import org.apache.beam.sdk.options.PipelineOptionsFactory; + +/** + * An {@link EnvironmentFactory} that communicates to a {@link FnHarness} via the in-process gRPC + * channel. + * + * TODO: Move this class to the runners/java-fn-execution module, with the Java SDK Harness as a + * provided dependency. + */ +class InProcessEnvironmentFactory implements EnvironmentFactory { + + private final GrpcFnServer loggingServer; + private final GrpcFnServer controlServer; + + private final ControlClientPool.Source clientSource; + + InProcessEnvironmentFactory( + GrpcFnServer loggingServer, + GrpcFnServer controlServer, + ControlClientPool.Source clientSource) { +this.loggingServer = loggingServer; +this.controlServer = controlServer; +checkArgument( +loggingServer.getApiServiceDescriptor() != null, +"Logging Server cannot have a null %s", +ApiServiceDescriptor.class.getSimpleName()); +checkArgument( +controlServer.getApiServiceDescriptor() != null, +"Control Server cannot have a null %s", +ApiServiceDescriptor.class.getSimpleName()); +this.clientSource = clientSource; + } + + @Override + public RemoteEnvironment createEnvironment(Environment container) throws Exception { +ExecutorService executor = Executors.newSingleThreadExecutor(); +Future fnHarness = +executor.submit( +() -> +FnHarness.main( Review comment: I've added BEAM-4384; I don't think this is particularly high priority - I don't expect most users to use this directly. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 104905) Time Spent: 13h 40m (was: 13.5h) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104901=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104901 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 23/May/18 00:31 Start Date: 23/May/18 00:31 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5349: [BEAM-3326] Remote stage evaluator URL: https://github.com/apache/beam/pull/5349#discussion_r190093623 ## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/InProcessEnvironmentFactory.java ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct.portable; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.time.Duration; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.beam.fn.harness.FnHarness; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.control.ControlClientPool; +import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; +import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory; +import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment; +import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService; +import org.apache.beam.sdk.fn.stream.StreamObserverFactory; +import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory; +import org.apache.beam.sdk.options.PipelineOptionsFactory; + +/** + * An {@link EnvironmentFactory} that communicates to a {@link FnHarness} via the in-process gRPC + * channel. + * + * TODO: Move this class to the runners/java-fn-execution module, with the Java SDK Harness as a + * provided dependency. + */ +class InProcessEnvironmentFactory implements EnvironmentFactory { + + private final GrpcFnServer loggingServer; + private final GrpcFnServer controlServer; + + private final ControlClientPool.Source clientSource; + + InProcessEnvironmentFactory( Review comment: It's fine, but also gets us very little. Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 104901) Time Spent: 13.5h (was: 13h 20m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 13.5h > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104900=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104900 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 23/May/18 00:31 Start Date: 23/May/18 00:31 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5349: [BEAM-3326] Remote stage evaluator URL: https://github.com/apache/beam/pull/5349#discussion_r190093615 ## File path: runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java ## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct.portable; + +import static com.google.common.collect.Iterables.getOnlyElement; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.Iterables; +import java.io.Serializable; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.PipelineTranslation; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser; +import org.apache.beam.runners.core.construction.graph.PipelineNode; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode; +import org.apache.beam.runners.core.construction.graph.QueryablePipeline; +import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.InProcessServerFactory; +import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; +import org.apache.beam.runners.fnexecution.control.JobBundleFactory; +import org.apache.beam.runners.fnexecution.data.GrpcDataService; +import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory; +import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService; +import org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter; +import org.apache.beam.runners.fnexecution.state.GrpcStateService; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.Impulse; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.joda.time.Instant; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link RemoteStageEvaluatorFactory}. */ +@RunWith(JUnit4.class) +public class RemoteStageEvaluatorFactoryTest implements Serializable { + + private transient RemoteStageEvaluatorFactory factory; + private transient ExecutorService executor; + private transient GrpcFnServer dataServer; Review comment: We can't - we don't create the harness ourselves, we rely on the `EnvironmentFactory` to do so. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 104900) Time Spent: 13h 20m (was: 13h 10m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL:
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104889=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104889 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 23/May/18 00:26 Start Date: 23/May/18 00:26 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5349: [BEAM-3326] Remote stage evaluator URL: https://github.com/apache/beam/pull/5349#discussion_r190092945 ## File path: runners/direct-java/build.gradle ## @@ -58,12 +58,14 @@ dependencies { compile project(path: ":beam-runners-local-java-core", configuration: "shadow") compile project(path: ":beam-runners-java-fn-execution", configuration: "shadow") compile project(path: ":beam-sdks-java-fn-execution", configuration: "shadow") + compileOnly project(path: ":beam-sdks-java-harness") shadow project(path: ":beam-sdks-java-core", configuration: "shadow") shadow library.java.joda_time shadow library.java.findbugs_jsr305 shadow library.java.slf4j_api shadow library.java.hamcrest_core shadow library.java.junit + testCompile project(path: ":beam-sdks-java-harness") Review comment: Now it's required because the `compileOnly` dependency is in a different project. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 104889) Time Spent: 13h 10m (was: 13h) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 13h 10m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104886=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104886 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 23/May/18 00:11 Start Date: 23/May/18 00:11 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5349: [BEAM-3326] Remote stage evaluator URL: https://github.com/apache/beam/pull/5349#discussion_r190090893 ## File path: runners/direct-java/build.gradle ## @@ -58,12 +58,14 @@ dependencies { compile project(path: ":beam-runners-local-java-core", configuration: "shadow") compile project(path: ":beam-runners-java-fn-execution", configuration: "shadow") compile project(path: ":beam-sdks-java-fn-execution", configuration: "shadow") + compileOnly project(path: ":beam-sdks-java-harness") Review comment: That's what this is, as far as I can tell - https://blog.gradle.org/introducing-compile-only-dependencies This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 104886) Time Spent: 13h (was: 12h 50m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 13h > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104871=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104871 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 22/May/18 23:52 Start Date: 22/May/18 23:52 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5349: [BEAM-3326] Remote stage evaluator URL: https://github.com/apache/beam/pull/5349#discussion_r190083099 ## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/InProcessEnvironmentFactory.java ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct.portable; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.time.Duration; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.beam.fn.harness.FnHarness; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.control.ControlClientPool; +import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; +import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory; +import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment; +import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService; +import org.apache.beam.sdk.fn.stream.StreamObserverFactory; +import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory; +import org.apache.beam.sdk.options.PipelineOptionsFactory; + +/** + * An {@link EnvironmentFactory} that communicates to a {@link FnHarness} via the in-process gRPC + * channel. + * + * TODO: Move this class to the runners/java-fn-execution module, with the Java SDK Harness as a + * provided dependency. + */ +class InProcessEnvironmentFactory implements EnvironmentFactory { + + private final GrpcFnServer loggingServer; + private final GrpcFnServer controlServer; + + private final ControlClientPool.Source clientSource; + + InProcessEnvironmentFactory( + GrpcFnServer loggingServer, + GrpcFnServer controlServer, + ControlClientPool.Source clientSource) { +this.loggingServer = loggingServer; +this.controlServer = controlServer; +checkArgument( +loggingServer.getApiServiceDescriptor() != null, +"Logging Server cannot have a null %s", +ApiServiceDescriptor.class.getSimpleName()); +checkArgument( +controlServer.getApiServiceDescriptor() != null, +"Control Server cannot have a null %s", +ApiServiceDescriptor.class.getSimpleName()); +this.clientSource = clientSource; + } + + @Override + public RemoteEnvironment createEnvironment(Environment container) throws Exception { +ExecutorService executor = Executors.newSingleThreadExecutor(); +Future fnHarness = +executor.submit( +() -> +FnHarness.main( Review comment: The best scenario would be if we created a new classpath which contained the Java harness code + what we detect in the users classpath and ran that. This would have the benefit of simulating the SDK container and remove the need for users to remember to put provide beam-sdks-java-harness as a dependency. The idea would be that beam-sdks-java-harness would be a jar containing all of its dependencies shaded away similar to what we do for Dataflow. We would embed the beam-sdks-java-harness.jar as a jar inside the direct runner jar so we wouldn't need to fetch it from Maven and also would be able to solve the problem where tests would run from the provided jar. If this seems to much, a corresponding JIRA and updating this code so if the FnHarness.main would throw a `ClassNotFoundException`, we tell users that the shared process
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104870=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104870 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 22/May/18 23:52 Start Date: 22/May/18 23:52 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5349: [BEAM-3326] Remote stage evaluator URL: https://github.com/apache/beam/pull/5349#discussion_r190086882 ## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/InProcessEnvironmentFactory.java ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct.portable; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.time.Duration; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.beam.fn.harness.FnHarness; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.control.ControlClientPool; +import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; +import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory; +import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment; +import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService; +import org.apache.beam.sdk.fn.stream.StreamObserverFactory; +import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory; +import org.apache.beam.sdk.options.PipelineOptionsFactory; + +/** + * An {@link EnvironmentFactory} that communicates to a {@link FnHarness} via the in-process gRPC + * channel. + * + * TODO: Move this class to the runners/java-fn-execution module, with the Java SDK Harness as a + * provided dependency. + */ +class InProcessEnvironmentFactory implements EnvironmentFactory { + + private final GrpcFnServer loggingServer; + private final GrpcFnServer controlServer; + + private final ControlClientPool.Source clientSource; + + InProcessEnvironmentFactory( Review comment: Would it make sense to migrate the InProcessSdkHarness TestRule to use the InProcessEnvironmentFactory? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 104870) Time Spent: 12h (was: 11h 50m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 12h > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104875=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104875 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 22/May/18 23:52 Start Date: 22/May/18 23:52 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5349: [BEAM-3326] Remote stage evaluator URL: https://github.com/apache/beam/pull/5349#discussion_r190082848 ## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/InProcessEnvironmentFactory.java ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct.portable; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.time.Duration; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.beam.fn.harness.FnHarness; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.control.ControlClientPool; +import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; +import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory; +import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment; +import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService; +import org.apache.beam.sdk.fn.stream.StreamObserverFactory; +import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory; +import org.apache.beam.sdk.options.PipelineOptionsFactory; + +/** + * An {@link EnvironmentFactory} that communicates to a {@link FnHarness} via the in-process gRPC + * channel. + * + * TODO: Move this class to the runners/java-fn-execution module, with the Java SDK Harness as a + * provided dependency. + */ +class InProcessEnvironmentFactory implements EnvironmentFactory { + + private final GrpcFnServer loggingServer; + private final GrpcFnServer controlServer; + + private final ControlClientPool.Source clientSource; + + InProcessEnvironmentFactory( + GrpcFnServer loggingServer, + GrpcFnServer controlServer, + ControlClientPool.Source clientSource) { +this.loggingServer = loggingServer; +this.controlServer = controlServer; +checkArgument( +loggingServer.getApiServiceDescriptor() != null, +"Logging Server cannot have a null %s", +ApiServiceDescriptor.class.getSimpleName()); +checkArgument( +controlServer.getApiServiceDescriptor() != null, +"Control Server cannot have a null %s", +ApiServiceDescriptor.class.getSimpleName()); +this.clientSource = clientSource; + } + + @Override + public RemoteEnvironment createEnvironment(Environment container) throws Exception { +ExecutorService executor = Executors.newSingleThreadExecutor(); +Future fnHarness = +executor.submit( +() -> +FnHarness.main( +PipelineOptionsFactory.create(), Review comment: Shouldn't we propagate `PipelineOptions` through? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 104875) Time Spent: 12h 50m (was: 12h 40m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature >
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104874=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104874 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 22/May/18 23:52 Start Date: 22/May/18 23:52 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5349: [BEAM-3326] Remote stage evaluator URL: https://github.com/apache/beam/pull/5349#discussion_r190087507 ## File path: runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java ## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct.portable; + +import static com.google.common.collect.Iterables.getOnlyElement; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.Iterables; +import java.io.Serializable; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.PipelineTranslation; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser; +import org.apache.beam.runners.core.construction.graph.PipelineNode; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode; +import org.apache.beam.runners.core.construction.graph.QueryablePipeline; +import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.InProcessServerFactory; +import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; +import org.apache.beam.runners.fnexecution.control.JobBundleFactory; +import org.apache.beam.runners.fnexecution.data.GrpcDataService; +import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory; +import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService; +import org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter; +import org.apache.beam.runners.fnexecution.state.GrpcStateService; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.Impulse; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.joda.time.Instant; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link RemoteStageEvaluatorFactory}. */ +@RunWith(JUnit4.class) +public class RemoteStageEvaluatorFactoryTest implements Serializable { + + private transient RemoteStageEvaluatorFactory factory; + private transient ExecutorService executor; + private transient GrpcFnServer dataServer; Review comment: Use the InProcessSdkHarness TestRule This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 104874) Time Spent: 12h 40m (was: 12.5h) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam >
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104873=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104873 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 22/May/18 23:52 Start Date: 22/May/18 23:52 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5349: [BEAM-3326] Remote stage evaluator URL: https://github.com/apache/beam/pull/5349#discussion_r190086276 ## File path: runners/direct-java/build.gradle ## @@ -58,12 +58,14 @@ dependencies { compile project(path: ":beam-runners-local-java-core", configuration: "shadow") compile project(path: ":beam-runners-java-fn-execution", configuration: "shadow") compile project(path: ":beam-sdks-java-fn-execution", configuration: "shadow") + compileOnly project(path: ":beam-sdks-java-harness") shadow project(path: ":beam-sdks-java-core", configuration: "shadow") shadow library.java.joda_time shadow library.java.findbugs_jsr305 shadow library.java.slf4j_api shadow library.java.hamcrest_core shadow library.java.junit + testCompile project(path: ":beam-sdks-java-harness") Review comment: using `provided` scope above should allow you to remove this line here This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 104873) Time Spent: 12.5h (was: 12h 20m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 12.5h > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104868=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104868 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 22/May/18 23:52 Start Date: 22/May/18 23:52 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5349: [BEAM-3326] Remote stage evaluator URL: https://github.com/apache/beam/pull/5349#discussion_r190082050 ## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/InProcessEnvironmentFactory.java ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct.portable; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.time.Duration; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.beam.fn.harness.FnHarness; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.control.ControlClientPool; +import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; +import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory; +import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment; +import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService; +import org.apache.beam.sdk.fn.stream.StreamObserverFactory; +import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory; +import org.apache.beam.sdk.options.PipelineOptionsFactory; + +/** + * An {@link EnvironmentFactory} that communicates to a {@link FnHarness} via the in-process gRPC + * channel. + * + * TODO: Move this class to the runners/java-fn-execution module, with the Java SDK Harness as a Review comment: Is it excessively difficult to do this now? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 104868) Time Spent: 11h 40m (was: 11.5h) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 11h 40m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104867=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104867 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 22/May/18 23:52 Start Date: 22/May/18 23:52 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5349: [BEAM-3326] Remote stage evaluator URL: https://github.com/apache/beam/pull/5349#discussion_r190082730 ## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/InProcessEnvironmentFactory.java ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct.portable; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.time.Duration; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.beam.fn.harness.FnHarness; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.control.ControlClientPool; +import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; +import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory; +import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment; +import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService; +import org.apache.beam.sdk.fn.stream.StreamObserverFactory; +import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory; +import org.apache.beam.sdk.options.PipelineOptionsFactory; + +/** + * An {@link EnvironmentFactory} that communicates to a {@link FnHarness} via the in-process gRPC Review comment: The important point isn't that the in-process gRPC channel is used, it is that the runner, SDK, and users code and dependencies are all shared inside the same process. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 104867) Time Spent: 11.5h (was: 11h 20m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 11.5h > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104869=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104869 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 22/May/18 23:52 Start Date: 22/May/18 23:52 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5349: [BEAM-3326] Remote stage evaluator URL: https://github.com/apache/beam/pull/5349#discussion_r190085434 ## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/InProcessEnvironmentFactory.java ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct.portable; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.time.Duration; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.beam.fn.harness.FnHarness; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.control.ControlClientPool; +import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; +import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory; +import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment; +import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService; +import org.apache.beam.sdk.fn.stream.StreamObserverFactory; +import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory; +import org.apache.beam.sdk.options.PipelineOptionsFactory; + +/** + * An {@link EnvironmentFactory} that communicates to a {@link FnHarness} via the in-process gRPC + * channel. + * + * TODO: Move this class to the runners/java-fn-execution module, with the Java SDK Harness as a + * provided dependency. + */ +class InProcessEnvironmentFactory implements EnvironmentFactory { + + private final GrpcFnServer loggingServer; + private final GrpcFnServer controlServer; + + private final ControlClientPool.Source clientSource; + + InProcessEnvironmentFactory( + GrpcFnServer loggingServer, + GrpcFnServer controlServer, + ControlClientPool.Source clientSource) { +this.loggingServer = loggingServer; +this.controlServer = controlServer; +checkArgument( +loggingServer.getApiServiceDescriptor() != null, +"Logging Server cannot have a null %s", +ApiServiceDescriptor.class.getSimpleName()); +checkArgument( +controlServer.getApiServiceDescriptor() != null, +"Control Server cannot have a null %s", +ApiServiceDescriptor.class.getSimpleName()); +this.clientSource = clientSource; + } + + @Override + public RemoteEnvironment createEnvironment(Environment container) throws Exception { +ExecutorService executor = Executors.newSingleThreadExecutor(); +Future fnHarness = +executor.submit( +() -> +FnHarness.main( +PipelineOptionsFactory.create(), +loggingServer.getApiServiceDescriptor(), +controlServer.getApiServiceDescriptor(), +InProcessManagedChannelFactory.create(), +StreamObserverFactory.direct())); +executor.submit( +() -> { + try { +fnHarness.get(); + } catch (Throwable t) { +executor.shutdownNow(); + } +}); + +// TODO: find some way to populate the actual ID in FnHarness.main() Review comment: Instead of relying on the WORKER_ID environment variable in Python, we could make the client id as part of the ApiServiceDescriptor. This would make the client id chosen by whomever vends the client and removes looking up a property. @herohde @angoenka What do you think?
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104872=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104872 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 22/May/18 23:52 Start Date: 22/May/18 23:52 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5349: [BEAM-3326] Remote stage evaluator URL: https://github.com/apache/beam/pull/5349#discussion_r190086221 ## File path: runners/direct-java/build.gradle ## @@ -58,12 +58,14 @@ dependencies { compile project(path: ":beam-runners-local-java-core", configuration: "shadow") compile project(path: ":beam-runners-java-fn-execution", configuration: "shadow") compile project(path: ":beam-sdks-java-fn-execution", configuration: "shadow") + compileOnly project(path: ":beam-sdks-java-harness") Review comment: use the `provided` scope. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 104872) Time Spent: 12h 20m (was: 12h 10m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 12h 20m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104679=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104679 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 22/May/18 16:49 Start Date: 22/May/18 16:49 Worklog Time Spent: 10m Work Description: tgroh closed pull request #5348: [BEAM-3326] Add a Direct Job Bundle Factory URL: https://github.com/apache/beam/pull/5348 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/direct-java/build.gradle b/runners/direct-java/build.gradle index 220f4fcffb9..e74cd5313fe 100644 --- a/runners/direct-java/build.gradle +++ b/runners/direct-java/build.gradle @@ -57,6 +57,7 @@ dependencies { compile project(path: ":beam-runners-core-java", configuration: "shadow") compile project(path: ":beam-runners-local-java-core", configuration: "shadow") compile project(path: ":beam-runners-java-fn-execution", configuration: "shadow") + compile project(path: ":beam-sdks-java-fn-execution", configuration: "shadow") shadow project(path: ":beam-sdks-java-core", configuration: "shadow") shadow library.java.joda_time shadow library.java.findbugs_jsr305 diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml index 0613a0a863f..1d3d790a747 100644 --- a/runners/direct-java/pom.xml +++ b/runners/direct-java/pom.xml @@ -221,6 +221,11 @@ beam-runners-core-java + + org.apache.beam + beam-sdks-java-fn-execution + + org.apache.beam beam-runners-java-fn-execution diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/BundleFactoryOutputRecieverFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/BundleFactoryOutputRecieverFactory.java new file mode 100644 index 000..3285619fd2d --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/BundleFactoryOutputRecieverFactory.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct.portable; + +import java.util.function.Consumer; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.Components; +import org.apache.beam.runners.core.construction.graph.PipelineNode; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; +import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory; +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.util.WindowedValue; + +/** + * An {@link OutputReceiverFactory} which adds received elements to {@link UncommittedBundle} + * instances. The produced {@link UncommittedBundle bundles} are added to a provided {@link + * StepTransformResult.Builder StepTransformResult Builder}. + */ +class BundleFactoryOutputRecieverFactory implements OutputReceiverFactory { + private final BundleFactory bundleFactory; + private final RunnerApi.Components components; + + private final ConsumerbundleConsumer; + + private BundleFactoryOutputRecieverFactory( + BundleFactory bundleFactory, + Components components, + Consumer bundleConsumer) { +this.bundleFactory = bundleFactory; +this.components = components; +this.bundleConsumer = bundleConsumer; + } + + public static OutputReceiverFactory create( + BundleFactory bundleFactory, + Components components, + Consumer resultBuilder) { +return new BundleFactoryOutputRecieverFactory(bundleFactory, components, resultBuilder); + } + + @Override + public FnDataReceiver create(String pCollectionId) { +PCollectionNode pcollection = +PipelineNode.pCollection(pCollectionId, components.getPcollectionsOrThrow(pCollectionId)); +return
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104678=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104678 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 22/May/18 16:49 Start Date: 22/May/18 16:49 Worklog Time Spent: 10m Work Description: tgroh commented on issue #5349: [BEAM-3326] Remote stage evaluator URL: https://github.com/apache/beam/pull/5349#issuecomment-391061972 run java precommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 104678) Time Spent: 11h 10m (was: 11h) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 11h 10m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104105=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104105 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 21/May/18 16:18 Start Date: 21/May/18 16:18 Worklog Time Spent: 10m Work Description: tgroh commented on issue #5348: [BEAM-3326] Add a Direct Job Bundle Factory URL: https://github.com/apache/beam/pull/5348#issuecomment-390704306 run java precommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 104105) Time Spent: 11h (was: 10h 50m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 11h > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=103588=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103588 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 18/May/18 23:07 Start Date: 18/May/18 23:07 Worklog Time Spent: 10m Work Description: tgroh commented on issue #5348: [BEAM-3326] Add a Direct Job Bundle Factory URL: https://github.com/apache/beam/pull/5348#issuecomment-390354745 Done to all This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 103588) Time Spent: 10h 50m (was: 10h 40m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 10h 50m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=103585=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103585 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 18/May/18 22:56 Start Date: 18/May/18 22:56 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5348: [BEAM-3326] Add a Direct Job Bundle Factory URL: https://github.com/apache/beam/pull/5348#discussion_r189410028 ## File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/IdGenerator.java ## @@ -0,0 +1,30 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information Review comment: Messed up license header. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 103585) Time Spent: 10h 40m (was: 10.5h) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 10h 40m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=103580=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103580 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 18/May/18 22:56 Start Date: 18/May/18 22:56 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5348: [BEAM-3326] Add a Direct Job Bundle Factory URL: https://github.com/apache/beam/pull/5348#discussion_r189410092 ## File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/package-info.java ## @@ -0,0 +1,23 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance Review comment: license header This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 103580) Time Spent: 9h 50m (was: 9h 40m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 9h 50m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=103582=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103582 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 18/May/18 22:56 Start Date: 18/May/18 22:56 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5348: [BEAM-3326] Add a Direct Job Bundle Factory URL: https://github.com/apache/beam/pull/5348#discussion_r189410106 ## File path: sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/IdGeneratorsTest.java ## @@ -0,0 +1,65 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * Review comment: license header. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 103582) Time Spent: 10h 10m (was: 10h) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 10h 10m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=103584=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103584 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 18/May/18 22:56 Start Date: 18/May/18 22:56 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5348: [BEAM-3326] Add a Direct Job Bundle Factory URL: https://github.com/apache/beam/pull/5348#discussion_r189409699 ## File path: runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/DirectJobBundleFactoryTest.java ## @@ -0,0 +1,181 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one Review comment: License header incorrectly formatted. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 103584) Time Spent: 10.5h (was: 10h 20m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 10.5h > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=103583=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103583 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 18/May/18 22:56 Start Date: 18/May/18 22:56 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5348: [BEAM-3326] Add a Direct Job Bundle Factory URL: https://github.com/apache/beam/pull/5348#discussion_r189409672 ## File path: runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/BundleFactoryOutputRecieverFactoryTest.java ## @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct.portable; + +import static com.google.common.collect.Iterables.getOnlyElement; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.Iterables; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.MessageWithComponents; +import org.apache.beam.runners.core.construction.CoderTranslation; +import org.apache.beam.runners.core.construction.RehydratedComponents; +import org.apache.beam.runners.core.construction.SdkComponents; +import org.apache.beam.runners.core.construction.graph.PipelineNode; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; +import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory; +import org.apache.beam.runners.fnexecution.wire.WireCoders; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link BundleFactoryOutputRecieverFactory}. */ +@RunWith(JUnit4.class) +public class BundleFactoryOutputRecieverFactoryTest { + private final BundleFactory bundleFactory = ImmutableListBundleFactory.create(); + private PCollectionNode fooPC; + private PCollectionNode barPC; + private RunnerApi.Components components; + + private OutputReceiverFactory factory; + private CollectionoutputBundles; + + @Before + public void setup() throws IOException { +Pipeline p = Pipeline.create(); +PCollection foo = +p.apply("createFoo", Create.of("1", "2", "3")) +.apply("windowFoo", Window.into(FixedWindows.of(Duration.standardMinutes(5L; +PCollection bar = p.apply("bar", Create.of(1, 2, 3)); + +SdkComponents sdkComponents = SdkComponents.create(); +String fooId = sdkComponents.registerPCollection(foo); +String barId = sdkComponents.registerPCollection(bar); +components = sdkComponents.toComponents(); + +fooPC = PipelineNode.pCollection(fooId, components.getPcollectionsOrThrow(fooId)); +barPC = PipelineNode.pCollection(barId, components.getPcollectionsOrThrow(barId)); + +outputBundles = new ArrayList<>(); +factory = +BundleFactoryOutputRecieverFactory.create(bundleFactory, components, outputBundles::add); + } + + @Test + public void addsBundlesToResult() { +factory.create(fooPC.getId()); +factory.create(barPC.getId()); + +assertThat(Iterables.size(outputBundles), equalTo(2)); + +Collection pcollections = new ArrayList<>(); +for (UncommittedBundle bundle : outputBundles) { +
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=103581=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103581 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 18/May/18 22:56 Start Date: 18/May/18 22:56 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5348: [BEAM-3326] Add a Direct Job Bundle Factory URL: https://github.com/apache/beam/pull/5348#discussion_r189408444 ## File path: runners/direct-java/pom.xml ## @@ -204,14 +204,19 @@ beam-runners-core-java + + org.apache.beam + beam-sdks-java-fn-execution + + org.apache.beam beam-runners-java-fn-execution com.google.guava - guava + guavatn Review comment: `tn` will break the Maven build This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 103581) Time Spent: 10h (was: 9h 50m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 10h > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=102547=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-102547 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 16/May/18 16:58 Start Date: 16/May/18 16:58 Worklog Time Spent: 10m Work Description: tgroh commented on issue #5348: [BEAM-3326] Add a Direct Job Bundle Factory URL: https://github.com/apache/beam/pull/5348#issuecomment-389591672 @lukecwik PTAL This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 102547) Time Spent: 9h 40m (was: 9.5h) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 9h 40m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=101946=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101946 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 15/May/18 00:34 Start Date: 15/May/18 00:34 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5348: [BEAM-3326] Add a Direct Job Bundle Factory URL: https://github.com/apache/beam/pull/5348#discussion_r188137062 ## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectJobBundleFactory.java ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct.portable; + +import com.google.common.collect.Iterables; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target; +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.control.JobBundleFactory; +import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory; +import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors; +import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.ExecutableProcessBundleDescriptor; +import org.apache.beam.runners.fnexecution.control.RemoteBundle; +import org.apache.beam.runners.fnexecution.control.RemoteOutputReceiver; +import org.apache.beam.runners.fnexecution.control.SdkHarnessClient; +import org.apache.beam.runners.fnexecution.control.SdkHarnessClient.ActiveBundle; +import org.apache.beam.runners.fnexecution.control.StageBundleFactory; +import org.apache.beam.runners.fnexecution.data.GrpcDataService; +import org.apache.beam.runners.fnexecution.data.RemoteInputDestination; +import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory; +import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment; +import org.apache.beam.runners.fnexecution.state.GrpcStateService; +import org.apache.beam.runners.fnexecution.state.StateRequestHandler; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.util.MoreFutures; +import org.apache.beam.sdk.util.WindowedValue; + +/** A {@link JobBundleFactory} for the ReferenceRunner. */ +class DirectJobBundleFactory implements JobBundleFactory { Review comment: Tested exceptions in the `close` methods. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 101946) Time Spent: 9h 20m (was: 9h 10m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 9h 20m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=101947=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101947 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 15/May/18 00:34 Start Date: 15/May/18 00:34 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5348: [BEAM-3326] Add a Direct Job Bundle Factory URL: https://github.com/apache/beam/pull/5348#discussion_r188137160 ## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectJobBundleFactory.java ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct.portable; + +import com.google.common.collect.Iterables; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target; +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.control.JobBundleFactory; +import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory; +import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors; +import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.ExecutableProcessBundleDescriptor; +import org.apache.beam.runners.fnexecution.control.RemoteBundle; +import org.apache.beam.runners.fnexecution.control.RemoteOutputReceiver; +import org.apache.beam.runners.fnexecution.control.SdkHarnessClient; +import org.apache.beam.runners.fnexecution.control.SdkHarnessClient.ActiveBundle; +import org.apache.beam.runners.fnexecution.control.StageBundleFactory; +import org.apache.beam.runners.fnexecution.data.GrpcDataService; +import org.apache.beam.runners.fnexecution.data.RemoteInputDestination; +import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory; +import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment; +import org.apache.beam.runners.fnexecution.state.GrpcStateService; +import org.apache.beam.runners.fnexecution.state.StateRequestHandler; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.util.MoreFutures; +import org.apache.beam.sdk.util.WindowedValue; + +/** A {@link JobBundleFactory} for the ReferenceRunner. */ +class DirectJobBundleFactory implements JobBundleFactory { + private final EnvironmentFactory environmentFactory; + + private final GrpcFnServer dataService; + private final GrpcFnServer stateService; + + private final ConcurrentMapstageBundleFactories = + new ConcurrentHashMap<>(); + private final ConcurrentMap environments = + new ConcurrentHashMap<>(); + + DirectJobBundleFactory( + EnvironmentFactory environmentFactory, + GrpcFnServer dataService, + GrpcFnServer stateService) { +this.environmentFactory = environmentFactory; +this.dataService = dataService; +this.stateService = stateService; + } + + @Override + public StageBundleFactory forStage(ExecutableStage executableStage) { +return (StageBundleFactory) +stageBundleFactories.computeIfAbsent(executableStage, this::createBundleFactory); + } + + private final AtomicLong idgen = new AtomicLong(); + + private StageBundleFactory createBundleFactory(ExecutableStage stage) { +RemoteEnvironment remoteEnv = +environments.computeIfAbsent( +stage.getEnvironment(), +env -> { + try { +return environmentFactory.createEnvironment(env); + } catch (Exception e) { +throw new RuntimeException(e); + } +}); +SdkHarnessClient sdkHarnessClient = +
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=101925=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101925 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 14/May/18 23:22 Start Date: 14/May/18 23:22 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5348: [BEAM-3326] Add a Direct Job Bundle Factory URL: https://github.com/apache/beam/pull/5348#discussion_r188127187 ## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectJobBundleFactory.java ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct.portable; + +import com.google.common.collect.Iterables; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target; +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.control.JobBundleFactory; +import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory; +import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors; +import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.ExecutableProcessBundleDescriptor; +import org.apache.beam.runners.fnexecution.control.RemoteBundle; +import org.apache.beam.runners.fnexecution.control.RemoteOutputReceiver; +import org.apache.beam.runners.fnexecution.control.SdkHarnessClient; +import org.apache.beam.runners.fnexecution.control.SdkHarnessClient.ActiveBundle; +import org.apache.beam.runners.fnexecution.control.StageBundleFactory; +import org.apache.beam.runners.fnexecution.data.GrpcDataService; +import org.apache.beam.runners.fnexecution.data.RemoteInputDestination; +import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory; +import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment; +import org.apache.beam.runners.fnexecution.state.GrpcStateService; +import org.apache.beam.runners.fnexecution.state.StateRequestHandler; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.util.MoreFutures; +import org.apache.beam.sdk.util.WindowedValue; + +/** A {@link JobBundleFactory} for the ReferenceRunner. */ +class DirectJobBundleFactory implements JobBundleFactory { + private final EnvironmentFactory environmentFactory; + + private final GrpcFnServer dataService; + private final GrpcFnServer stateService; + + private final ConcurrentMapstageBundleFactories = + new ConcurrentHashMap<>(); + private final ConcurrentMap environments = + new ConcurrentHashMap<>(); + + DirectJobBundleFactory( + EnvironmentFactory environmentFactory, + GrpcFnServer dataService, + GrpcFnServer stateService) { +this.environmentFactory = environmentFactory; +this.dataService = dataService; +this.stateService = stateService; + } + + @Override + public StageBundleFactory forStage(ExecutableStage executableStage) { +return (StageBundleFactory) +stageBundleFactories.computeIfAbsent(executableStage, this::createBundleFactory); + } + + private final AtomicLong idgen = new AtomicLong(); Review comment: Done. Done everywhere, specifically. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 101925) Time Spent:
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=101813=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101813 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 14/May/18 18:17 Start Date: 14/May/18 18:17 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5348: [BEAM-3326] Add a Direct Job Bundle Factory URL: https://github.com/apache/beam/pull/5348#discussion_r188044479 ## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectJobBundleFactory.java ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct.portable; + +import com.google.common.collect.Iterables; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target; +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.control.JobBundleFactory; +import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory; +import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors; +import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.ExecutableProcessBundleDescriptor; +import org.apache.beam.runners.fnexecution.control.RemoteBundle; +import org.apache.beam.runners.fnexecution.control.RemoteOutputReceiver; +import org.apache.beam.runners.fnexecution.control.SdkHarnessClient; +import org.apache.beam.runners.fnexecution.control.SdkHarnessClient.ActiveBundle; +import org.apache.beam.runners.fnexecution.control.StageBundleFactory; +import org.apache.beam.runners.fnexecution.data.GrpcDataService; +import org.apache.beam.runners.fnexecution.data.RemoteInputDestination; +import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory; +import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment; +import org.apache.beam.runners.fnexecution.state.GrpcStateService; +import org.apache.beam.runners.fnexecution.state.StateRequestHandler; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.util.MoreFutures; +import org.apache.beam.sdk.util.WindowedValue; + +/** A {@link JobBundleFactory} for the ReferenceRunner. */ +class DirectJobBundleFactory implements JobBundleFactory { + private final EnvironmentFactory environmentFactory; + + private final GrpcFnServer dataService; + private final GrpcFnServer stateService; + + private final ConcurrentMapstageBundleFactories = + new ConcurrentHashMap<>(); + private final ConcurrentMap environments = + new ConcurrentHashMap<>(); + + DirectJobBundleFactory( + EnvironmentFactory environmentFactory, + GrpcFnServer dataService, + GrpcFnServer stateService) { +this.environmentFactory = environmentFactory; +this.dataService = dataService; +this.stateService = stateService; + } + + @Override + public StageBundleFactory forStage(ExecutableStage executableStage) { +return (StageBundleFactory) +stageBundleFactories.computeIfAbsent(executableStage, this::createBundleFactory); + } + + private final AtomicLong idgen = new AtomicLong(); Review comment: Use https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/IdGenerator.java Move it to a shared location if needed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=101810=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101810 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 14/May/18 18:16 Start Date: 14/May/18 18:16 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5348: [BEAM-3326] Add a Direct Job Bundle Factory URL: https://github.com/apache/beam/pull/5348#discussion_r188048988 ## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectJobBundleFactory.java ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct.portable; + +import com.google.common.collect.Iterables; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target; +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.control.JobBundleFactory; +import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory; +import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors; +import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.ExecutableProcessBundleDescriptor; +import org.apache.beam.runners.fnexecution.control.RemoteBundle; +import org.apache.beam.runners.fnexecution.control.RemoteOutputReceiver; +import org.apache.beam.runners.fnexecution.control.SdkHarnessClient; +import org.apache.beam.runners.fnexecution.control.SdkHarnessClient.ActiveBundle; +import org.apache.beam.runners.fnexecution.control.StageBundleFactory; +import org.apache.beam.runners.fnexecution.data.GrpcDataService; +import org.apache.beam.runners.fnexecution.data.RemoteInputDestination; +import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory; +import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment; +import org.apache.beam.runners.fnexecution.state.GrpcStateService; +import org.apache.beam.runners.fnexecution.state.StateRequestHandler; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.util.MoreFutures; +import org.apache.beam.sdk.util.WindowedValue; + +/** A {@link JobBundleFactory} for the ReferenceRunner. */ +class DirectJobBundleFactory implements JobBundleFactory { Review comment: Are you planning to have NeedsRunner/ValidatesRunner cover this and the StageBundleFactory testing? If so, it might be worthwhile to add error handling tests since NeedsRunner/ValidatesRunner typically don't handle edge cases. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 101810) Time Spent: 8h 40m (was: 8.5h) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 8h 40m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=101811=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101811 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 14/May/18 18:16 Start Date: 14/May/18 18:16 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5348: [BEAM-3326] Add a Direct Job Bundle Factory URL: https://github.com/apache/beam/pull/5348#discussion_r188046059 ## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectJobBundleFactory.java ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct.portable; + +import com.google.common.collect.Iterables; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target; +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.control.JobBundleFactory; +import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory; +import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors; +import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.ExecutableProcessBundleDescriptor; +import org.apache.beam.runners.fnexecution.control.RemoteBundle; +import org.apache.beam.runners.fnexecution.control.RemoteOutputReceiver; +import org.apache.beam.runners.fnexecution.control.SdkHarnessClient; +import org.apache.beam.runners.fnexecution.control.SdkHarnessClient.ActiveBundle; +import org.apache.beam.runners.fnexecution.control.StageBundleFactory; +import org.apache.beam.runners.fnexecution.data.GrpcDataService; +import org.apache.beam.runners.fnexecution.data.RemoteInputDestination; +import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory; +import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment; +import org.apache.beam.runners.fnexecution.state.GrpcStateService; +import org.apache.beam.runners.fnexecution.state.StateRequestHandler; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.util.MoreFutures; +import org.apache.beam.sdk.util.WindowedValue; + +/** A {@link JobBundleFactory} for the ReferenceRunner. */ +class DirectJobBundleFactory implements JobBundleFactory { + private final EnvironmentFactory environmentFactory; + + private final GrpcFnServer dataService; + private final GrpcFnServer stateService; + + private final ConcurrentMapstageBundleFactories = + new ConcurrentHashMap<>(); + private final ConcurrentMap environments = + new ConcurrentHashMap<>(); + + DirectJobBundleFactory( + EnvironmentFactory environmentFactory, + GrpcFnServer dataService, + GrpcFnServer stateService) { +this.environmentFactory = environmentFactory; +this.dataService = dataService; +this.stateService = stateService; + } + + @Override + public StageBundleFactory forStage(ExecutableStage executableStage) { +return (StageBundleFactory) +stageBundleFactories.computeIfAbsent(executableStage, this::createBundleFactory); + } + + private final AtomicLong idgen = new AtomicLong(); + + private StageBundleFactory createBundleFactory(ExecutableStage stage) { +RemoteEnvironment remoteEnv = +environments.computeIfAbsent( +stage.getEnvironment(), +env -> { + try { +return environmentFactory.createEnvironment(env); + } catch (Exception e) { +throw new RuntimeException(e); + } +}); +SdkHarnessClient sdkHarnessClient = +
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=101812=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101812 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 14/May/18 18:16 Start Date: 14/May/18 18:16 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5348: [BEAM-3326] Add a Direct Job Bundle Factory URL: https://github.com/apache/beam/pull/5348#discussion_r188044479 ## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectJobBundleFactory.java ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct.portable; + +import com.google.common.collect.Iterables; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target; +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.control.JobBundleFactory; +import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory; +import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors; +import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.ExecutableProcessBundleDescriptor; +import org.apache.beam.runners.fnexecution.control.RemoteBundle; +import org.apache.beam.runners.fnexecution.control.RemoteOutputReceiver; +import org.apache.beam.runners.fnexecution.control.SdkHarnessClient; +import org.apache.beam.runners.fnexecution.control.SdkHarnessClient.ActiveBundle; +import org.apache.beam.runners.fnexecution.control.StageBundleFactory; +import org.apache.beam.runners.fnexecution.data.GrpcDataService; +import org.apache.beam.runners.fnexecution.data.RemoteInputDestination; +import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory; +import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment; +import org.apache.beam.runners.fnexecution.state.GrpcStateService; +import org.apache.beam.runners.fnexecution.state.StateRequestHandler; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.util.MoreFutures; +import org.apache.beam.sdk.util.WindowedValue; + +/** A {@link JobBundleFactory} for the ReferenceRunner. */ +class DirectJobBundleFactory implements JobBundleFactory { + private final EnvironmentFactory environmentFactory; + + private final GrpcFnServer dataService; + private final GrpcFnServer stateService; + + private final ConcurrentMapstageBundleFactories = + new ConcurrentHashMap<>(); + private final ConcurrentMap environments = + new ConcurrentHashMap<>(); + + DirectJobBundleFactory( + EnvironmentFactory environmentFactory, + GrpcFnServer dataService, + GrpcFnServer stateService) { +this.environmentFactory = environmentFactory; +this.dataService = dataService; +this.stateService = stateService; + } + + @Override + public StageBundleFactory forStage(ExecutableStage executableStage) { +return (StageBundleFactory) +stageBundleFactories.computeIfAbsent(executableStage, this::createBundleFactory); + } + + private final AtomicLong idgen = new AtomicLong(); Review comment: Use https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/IdGenerator.java This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=101369=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101369 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 12/May/18 01:18 Start Date: 12/May/18 01:18 Worklog Time Spent: 10m Work Description: tgroh opened a new pull request #5349: [BEAM-3326] Remote stage evaluator URL: https://github.com/apache/beam/pull/5349 Add a Primitive Evaluator that evaluates remote stages. Add an InProcessEnvironmentFactory to construct Java Fn Harnesses. Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Make sure there is a [JIRA issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes. - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue. - [ ] Write a pull request description that is detailed enough to understand: - [ ] What the pull request does - [ ] Why it does it - [ ] How it does it - [ ] Why this approach - [ ] Each commit in the pull request should have a meaningful subject line and body. - [ ] Run `./gradlew build` to make sure basic checks pass. A more thorough check will be performed on your pull request automatically. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 101369) Time Spent: 8.5h (was: 8h 20m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 8.5h > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=101363=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101363 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 12/May/18 00:45 Start Date: 12/May/18 00:45 Worklog Time Spent: 10m Work Description: tgroh opened a new pull request #5348: [BEAM-3326] Add a Direct Job Bundle Factory URL: https://github.com/apache/beam/pull/5348 This is the JobBundleFactory for the DirectRunner, and the OutputReceiverFactory for it to use. Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Make sure there is a [JIRA issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes. - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue. - [ ] Write a pull request description that is detailed enough to understand: - [ ] What the pull request does - [ ] Why it does it - [ ] How it does it - [ ] Why this approach - [ ] Each commit in the pull request should have a meaningful subject line and body. - [ ] Run `./gradlew build` to make sure basic checks pass. A more thorough check will be performed on your pull request automatically. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 101363) Time Spent: 8h 10m (was: 8h) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 8h 10m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=101364=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101364 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 12/May/18 00:45 Start Date: 12/May/18 00:45 Worklog Time Spent: 10m Work Description: tgroh commented on issue #5348: [BEAM-3326] Add a Direct Job Bundle Factory URL: https://github.com/apache/beam/pull/5348#issuecomment-388517950 R: @axelmagn This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 101364) Time Spent: 8h 20m (was: 8h 10m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 8h 20m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=98097=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-98097 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 04/May/18 01:53 Start Date: 04/May/18 01:53 Worklog Time Spent: 10m Work Description: tgroh closed pull request #5269: [BEAM-3326] Add an Unsupported StateRequestHandler URL: https://github.com/apache/beam/pull/5269 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandler.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandler.java index cfa86db1b04..d0858935d73 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandler.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandler.java @@ -18,7 +18,8 @@ package org.apache.beam.runners.fnexecution.state; import java.util.concurrent.CompletionStage; -import org.apache.beam.model.fnexecution.v1.BeamFnApi; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse; /** * Handler for {@link org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest StateRequests}. @@ -34,6 +35,12 @@ * Throwing an error during handling will complete the handler result {@link CompletionStage} * exceptionally. */ - CompletionStage handle(BeamFnApi.StateRequest request) - throws Exception; + CompletionStage handle(StateRequest request) throws Exception; + + static StateRequestHandler unsupported() { +return request -> { + throw new UnsupportedOperationException( + String.format("Cannot use an empty %s", StateRequestHandler.class.getSimpleName())); +}; + } } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 98097) Time Spent: 8h (was: 7h 50m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 8h > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=98062=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-98062 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 03/May/18 22:30 Start Date: 03/May/18 22:30 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5269: [BEAM-3326] Add an Unsupported StateRequestHandler URL: https://github.com/apache/beam/pull/5269#discussion_r185953920 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandler.java ## @@ -34,6 +35,12 @@ * Throwing an error during handling will complete the handler result {@link CompletionStage} * exceptionally. */ - CompletionStage handle(BeamFnApi.StateRequest request) - throws Exception; + CompletionStage handle(StateRequest request) throws Exception; + + static StateRequestHandler unsupported() { Review comment: For some reason I thought it was a class. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 98062) Time Spent: 7h 50m (was: 7h 40m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 7h 50m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=98030=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-98030 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 03/May/18 20:32 Start Date: 03/May/18 20:32 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5269: [BEAM-3326] Add an Unsupported StateRequestHandler URL: https://github.com/apache/beam/pull/5269#discussion_r185927830 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandler.java ## @@ -34,6 +35,12 @@ * Throwing an error during handling will complete the handler result {@link CompletionStage} * exceptionally. */ - CompletionStage handle(BeamFnApi.StateRequest request) - throws Exception; + CompletionStage handle(StateRequest request) throws Exception; + + static StateRequestHandler unsupported() { Review comment: It is definitely not redundant - you must choose between `static`, `default`, or no-implementation for interface methods This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 98030) Time Spent: 7h 40m (was: 7.5h) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 7h 40m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=98027=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-98027 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 03/May/18 20:01 Start Date: 03/May/18 20:01 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5269: [BEAM-3326] Add an Unsupported StateRequestHandler URL: https://github.com/apache/beam/pull/5269#discussion_r185919685 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandler.java ## @@ -34,6 +35,12 @@ * Throwing an error during handling will complete the handler result {@link CompletionStage} * exceptionally. */ - CompletionStage handle(BeamFnApi.StateRequest request) - throws Exception; + CompletionStage handle(StateRequest request) throws Exception; + + static StateRequestHandler unsupported() { Review comment: `static` is also redundant. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 98027) Time Spent: 7.5h (was: 7h 20m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 7.5h > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=98011=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-98011 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 03/May/18 19:06 Start Date: 03/May/18 19:06 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5269: [BEAM-3326] Add an Unsupported StateRequestHandler URL: https://github.com/apache/beam/pull/5269#discussion_r185906501 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandler.java ## @@ -34,6 +35,12 @@ * Throwing an error during handling will complete the handler result {@link CompletionStage} * exceptionally. */ - CompletionStage handle(BeamFnApi.StateRequest request) - throws Exception; + CompletionStage handle(StateRequest request) throws Exception; + + static StateRequestHandler unsupported() { Review comment: Ah, I hadn't noticed this was an interface. I don't think `static` is redundant though. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 98011) Time Spent: 7h 20m (was: 7h 10m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 7h 20m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=98006=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-98006 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 03/May/18 18:55 Start Date: 03/May/18 18:55 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5269: [BEAM-3326] Add an Unsupported StateRequestHandler URL: https://github.com/apache/beam/pull/5269#discussion_r185903479 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandler.java ## @@ -34,6 +35,12 @@ * Throwing an error during handling will complete the handler result {@link CompletionStage} * exceptionally. */ - CompletionStage handle(BeamFnApi.StateRequest request) - throws Exception; + CompletionStage handle(StateRequest request) throws Exception; + + static StateRequestHandler unsupported() { Review comment: classes inside interfaces are public and not package private. Drop `static` as it is also a redundant modifier. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 98006) Time Spent: 7h 10m (was: 7h) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 7h 10m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=98004=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-98004 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 03/May/18 18:35 Start Date: 03/May/18 18:35 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5269: [BEAM-3326] Add an Unsupported StateRequestHandler URL: https://github.com/apache/beam/pull/5269#discussion_r185898147 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandler.java ## @@ -34,6 +35,12 @@ * Throwing an error during handling will complete the handler result {@link CompletionStage} * exceptionally. */ - CompletionStage handle(BeamFnApi.StateRequest request) - throws Exception; + CompletionStage handle(StateRequest request) throws Exception; + + static StateRequestHandler unsupported() { Review comment: Why package-private? This seems useful anywhere we don't/can't support state. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 98004) Time Spent: 7h (was: 6h 50m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 7h > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=97980=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97980 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 03/May/18 17:42 Start Date: 03/May/18 17:42 Worklog Time Spent: 10m Work Description: tgroh commented on issue #5269: [BEAM-3326] Add an Unsupported StateRequestHandler URL: https://github.com/apache/beam/pull/5269#issuecomment-386377992 R: @bsidhom This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 97980) Time Spent: 6h 50m (was: 6h 40m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 6h 50m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=97979=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97979 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 03/May/18 17:42 Start Date: 03/May/18 17:42 Worklog Time Spent: 10m Work Description: tgroh opened a new pull request #5269: [BEAM-3326] Add an Unsupported StateRequestHandler URL: https://github.com/apache/beam/pull/5269 This can be provided to any consumer of state that consumes a StateRequestHandler if the PipelineRunner or Pipeline does not support State Requests in that context. Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Make sure there is a [JIRA issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes. - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue. - [ ] Write a pull request description that is detailed enough to understand: - [ ] What the pull request does - [ ] Why it does it - [ ] How it does it - [ ] Why this approach - [ ] Each commit in the pull request should have a meaningful subject line and body. - [ ] Run `./gradlew build` to make sure basic checks pass. A more thorough check will be performed on your pull request automatically. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 97979) Time Spent: 6h 40m (was: 6.5h) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 6h 40m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=92772=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92772 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 19/Apr/18 18:29 Start Date: 19/Apr/18 18:29 Worklog Time Spent: 10m Work Description: tgroh closed pull request #4761: [BEAM-3326] Remove SdkHarnessClientControlService URL: https://github.com/apache/beam/pull/4761 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientControlService.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientControlService.java deleted file mode 100644 index 3f1d14809c9..000 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientControlService.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.fnexecution.control; - -import io.grpc.ServerServiceDefinition; -import java.util.Collection; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.function.Supplier; -import org.apache.beam.runners.fnexecution.FnService; -import org.apache.beam.runners.fnexecution.HeaderAccessor; -import org.apache.beam.runners.fnexecution.data.FnDataService; - -/** - * A service providing {@link SdkHarnessClient} based on an internally managed {@link - * FnApiControlClientPoolService}. - */ -public class SdkHarnessClientControlService implements FnService { - private final FnApiControlClientPoolService clientPoolService; - private final ControlClientPool pendingClients; - - private final Supplier dataService; - - private final Collection activeClients; - - public static SdkHarnessClientControlService create( - Supplier dataService, HeaderAccessor headerAccessor) { -return new SdkHarnessClientControlService(dataService, headerAccessor); - } - - private SdkHarnessClientControlService( - Supplier dataService, HeaderAccessor headerAccessor) { -this.dataService = dataService; -activeClients = new ConcurrentLinkedQueue<>(); -pendingClients = QueueControlClientPool.createSynchronous(); -clientPoolService = - FnApiControlClientPoolService.offeringClientsToPool(pendingClients.getSink(), -headerAccessor); - } - - public SdkHarnessClient getClient() { -try { - // Block until a client is available. - FnApiControlClient getClient = pendingClients.getSource().get(); - return SdkHarnessClient.usingFnApiClient(getClient, dataService.get()); -} catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Interrupted while waiting for client", e); -} catch (Exception e) { - throw new RuntimeException(e); -} - } - - @Override - public void close() throws Exception { -for (SdkHarnessClient client : activeClients) { - client.close(); -} - } - - @Override - public ServerServiceDefinition bindService() { -return clientPoolService.bindService(); - } -} This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 92772) Time Spent: 6.5h (was: 6h 20m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=90163=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-90163 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 11/Apr/18 20:47 Start Date: 11/Apr/18 20:47 Worklog Time Spent: 10m Work Description: tgroh commented on issue #4761: [BEAM-3326] Remove SdkHarnessClientControlService URL: https://github.com/apache/beam/pull/4761#issuecomment-380590353 run java precommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 90163) Time Spent: 6h 20m (was: 6h 10m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 6h 20m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=90162=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-90162 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 11/Apr/18 20:47 Start Date: 11/Apr/18 20:47 Worklog Time Spent: 10m Work Description: tgroh commented on issue #4761: [BEAM-3326] Remove SdkHarnessClientControlService URL: https://github.com/apache/beam/pull/4761#issuecomment-380590353 run java precommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 90162) Time Spent: 6h 10m (was: 6h) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 6h 10m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=87212=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-87212 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 03/Apr/18 19:14 Start Date: 03/Apr/18 19:14 Worklog Time Spent: 10m Work Description: bsidhom commented on issue #4761: [BEAM-3326] Remove SdkHarnessClientControlService URL: https://github.com/apache/beam/pull/4761#issuecomment-378364443 It looks like there are merge conflicts now. Is the plan still to remove this? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 87212) Time Spent: 6h (was: 5h 50m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 6h > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=86805=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86805 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 02/Apr/18 22:17 Start Date: 02/Apr/18 22:17 Worklog Time Spent: 10m Work Description: bsidhom commented on issue #4761: [BEAM-3326] Remove SdkHarnessClientControlService URL: https://github.com/apache/beam/pull/4761#issuecomment-378063871 No. We can go ahead and remove this now. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 86805) Time Spent: 5h 50m (was: 5h 40m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 5h 50m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=86751=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86751 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 02/Apr/18 20:31 Start Date: 02/Apr/18 20:31 Worklog Time Spent: 10m Work Description: tgroh commented on issue #4761: [BEAM-3326] Remove SdkHarnessClientControlService URL: https://github.com/apache/beam/pull/4761#issuecomment-378035011 @bsidhom Is your concern still true? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 86751) Time Spent: 5h 40m (was: 5.5h) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 5h 40m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=85428=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85428 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 28/Mar/18 22:53 Start Date: 28/Mar/18 22:53 Worklog Time Spent: 10m Work Description: lukecwik closed pull request #4970: [BEAM-3326] Address additional comments from PR/4963. URL: https://github.com/apache/beam/pull/4970 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java index de1aa57fe32..ebcb8b4d33f 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java @@ -180,7 +180,7 @@ public String getBundleId() { /** * Returns a {@link FnDataReceiver receiver} which consumes input elements forwarding them - * to the SDK. When + * to the SDK. */ public FnDataReceivergetInputReceiver() { return inputReceiver; @@ -211,8 +211,9 @@ public void close() throws Exception { if (exception == null) { MoreFutures.get(response); } else { - // TODO: Handle aborting the bundle being processed. - throw new IllegalStateException("Processing bundle failed, TODO: abort bundle."); + // TODO: [BEAM-3962] Handle aborting the bundle being processed. + throw new IllegalStateException("Processing bundle failed, " + + "TODO: [BEAM-3962] abort bundle."); } } catch (Exception e) { if (exception == null) { diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/FnDataService.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/FnDataService.java index 6c7839fc193..d86324eb366 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/FnDataService.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/FnDataService.java @@ -38,9 +38,9 @@ * The provided coder is used to decode inbound elements. The decoded elements are passed to * the provided receiver. * - * Any failure during decoding or processing of the element will complete the returned future - * exceptionally. On successful termination of the stream, the returned future is completed - * successfully. + * Any failure during decoding or processing of the element will put the + * {@link InboundDataClient} into an error state such that + * {@link InboundDataClient#awaitCompletion()} will throw an exception. * * The provided receiver is not required to be thread safe. */ This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 85428) Time Spent: 5.5h (was: 5h 20m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 5.5h > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=85348=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85348 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 28/Mar/18 18:40 Start Date: 28/Mar/18 18:40 Worklog Time Spent: 10m Work Description: lukecwik opened a new pull request #4970: [BEAM-3326] Address additional comments from PR/4963. URL: https://github.com/apache/beam/pull/4970 Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Make sure there is a [JIRA issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes. - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue. - [ ] Write a pull request description that is detailed enough to understand: - [ ] What the pull request does - [ ] Why it does it - [ ] How it does it - [ ] Why this approach - [ ] Each commit in the pull request should have a meaningful subject line and body. - [ ] Run `mvn clean verify` to make sure basic checks pass. A more thorough check will be performed on your pull request automatically. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 85348) Time Spent: 5h 10m (was: 5h) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 5h 10m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=85349=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85349 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 28/Mar/18 18:40 Start Date: 28/Mar/18 18:40 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #4970: [BEAM-3326] Address additional comments from PR/4963. URL: https://github.com/apache/beam/pull/4970#issuecomment-376992745 CC: @bsidhom This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 85349) Time Spent: 5h 20m (was: 5h 10m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 5h 20m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=85347=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85347 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 28/Mar/18 18:35 Start Date: 28/Mar/18 18:35 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #4963: [BEAM-3326] Abstract away closing the inbound receiver, waiting for the bundle to finish, waiting for outbound to complete within the ActiveBundle. URL: https://github.com/apache/beam/pull/4963#discussion_r177849543 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java ## @@ -146,22 +154,92 @@ private BundleProcessor( } /** An active bundle for a particular {@link BeamFnApi.ProcessBundleDescriptor}. */ - @AutoValue - public abstract static class ActiveBundle { -public abstract String getBundleId(); - -public abstract CompletionStage getBundleResponse(); + public static class ActiveBundle implements AutoCloseable { +private final String bundleId; +private final CompletionStage response; +private final CloseableFnDataReceiverinputReceiver; +private final Map outputClients; -public abstract CloseableFnDataReceiver getInputReceiver(); -public abstract Map getOutputClients(); - -public static ActiveBundle create( +private ActiveBundle( String bundleId, CompletionStage response, -CloseableFnDataReceiver dataReceiver, +CloseableFnDataReceiver inputReceiver, Map outputClients) { - return new AutoValue_SdkHarnessClient_ActiveBundle<>( - bundleId, response, dataReceiver, outputClients); + this.bundleId = bundleId; + this.response = response; + this.inputReceiver = inputReceiver; + this.outputClients = outputClients; +} + +/** + * Returns an id used to represent this bundle. + */ +public String getBundleId() { + return bundleId; +} + +/** + * Returns a {@link FnDataReceiver receiver} which consumes input elements forwarding them + * to the SDK. When + */ +public FnDataReceiver getInputReceiver() { + return inputReceiver; +} + +/** + * Blocks till bundle processing is finished. This is comprised of: + * + * closing the {@link #getInputReceiver() input receiver}. + * waiting for the SDK to say that processing the bundle is finished. + * waiting for all inbound data clients to complete + * + * + * This method will throw an exception if bundle processing has failed. + * {@link Throwable#getSuppressed()} will return all the reasons as to why processing has + * failed. + */ +@Override +public void close() throws Exception { + Exception exception = null; + try { +inputReceiver.close(); + } catch (Exception e) { +exception = e; + } + try { +// We don't have to worry about the completion stage. +if (exception == null) { + MoreFutures.get(response); +} else { + // TODO: Handle aborting the bundle being processed. + throw new IllegalStateException("Processing bundle failed, TODO: abort bundle."); +} + } catch (Exception e) { +if (exception == null) { + exception = e; +} else { + exception.addSuppressed(e); +} + } + for (InboundDataClient outputClient : outputClients.values()) { +try { + // If we failed processing this bundle, we should cancel all inbound data. + if (exception == null) { +outputClient.awaitCompletion(); Review comment: We await completion on all outbound clients. No, the bundle result represents to the best knowledge that the SDK did everything it needed to. There can still be wire transfer/network/decoding failures that the SDK wouldn't be aware of after sending bundle process completion. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 85347) Time Spent: 5h (was: 4h 50m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL:
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=85310=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85310 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 28/Mar/18 15:59 Start Date: 28/Mar/18 15:59 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #4963: [BEAM-3326] Abstract away closing the inbound receiver, waiting for the bundle to finish, waiting for outbound to complete within the ActiveBundle. URL: https://github.com/apache/beam/pull/4963#discussion_r177799450 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java ## @@ -146,22 +154,92 @@ private BundleProcessor( } /** An active bundle for a particular {@link BeamFnApi.ProcessBundleDescriptor}. */ - @AutoValue - public abstract static class ActiveBundle { -public abstract String getBundleId(); - -public abstract CompletionStage getBundleResponse(); + public static class ActiveBundle implements AutoCloseable { +private final String bundleId; +private final CompletionStage response; +private final CloseableFnDataReceiverinputReceiver; +private final Map outputClients; -public abstract CloseableFnDataReceiver getInputReceiver(); -public abstract Map getOutputClients(); - -public static ActiveBundle create( +private ActiveBundle( String bundleId, CompletionStage response, -CloseableFnDataReceiver dataReceiver, +CloseableFnDataReceiver inputReceiver, Map outputClients) { - return new AutoValue_SdkHarnessClient_ActiveBundle<>( - bundleId, response, dataReceiver, outputClients); + this.bundleId = bundleId; + this.response = response; + this.inputReceiver = inputReceiver; + this.outputClients = outputClients; +} + +/** + * Returns an id used to represent this bundle. + */ +public String getBundleId() { + return bundleId; +} + +/** + * Returns a {@link FnDataReceiver receiver} which consumes input elements forwarding them + * to the SDK. When + */ +public FnDataReceiver getInputReceiver() { + return inputReceiver; +} + +/** + * Blocks till bundle processing is finished. This is comprised of: + * + * closing the {@link #getInputReceiver() input receiver}. + * waiting for the SDK to say that processing the bundle is finished. + * waiting for all inbound data clients to complete + * + * + * This method will throw an exception if bundle processing has failed. + * {@link Throwable#getSuppressed()} will return all the reasons as to why processing has + * failed. + */ +@Override +public void close() throws Exception { + Exception exception = null; + try { +inputReceiver.close(); + } catch (Exception e) { +exception = e; + } + try { +// We don't have to worry about the completion stage. +if (exception == null) { + MoreFutures.get(response); +} else { + // TODO: Handle aborting the bundle being processed. Review comment: Could you reference a tracking bug? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 85310) Time Spent: 4h 40m (was: 4.5h) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 4h 40m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=85311=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85311 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 28/Mar/18 15:59 Start Date: 28/Mar/18 15:59 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #4963: [BEAM-3326] Abstract away closing the inbound receiver, waiting for the bundle to finish, waiting for outbound to complete within the ActiveBundle. URL: https://github.com/apache/beam/pull/4963#discussion_r177798196 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java ## @@ -146,22 +154,92 @@ private BundleProcessor( } /** An active bundle for a particular {@link BeamFnApi.ProcessBundleDescriptor}. */ - @AutoValue - public abstract static class ActiveBundle { -public abstract String getBundleId(); - -public abstract CompletionStage getBundleResponse(); + public static class ActiveBundle implements AutoCloseable { +private final String bundleId; +private final CompletionStage response; +private final CloseableFnDataReceiverinputReceiver; +private final Map outputClients; -public abstract CloseableFnDataReceiver getInputReceiver(); -public abstract Map getOutputClients(); - -public static ActiveBundle create( +private ActiveBundle( String bundleId, CompletionStage response, -CloseableFnDataReceiver dataReceiver, +CloseableFnDataReceiver inputReceiver, Map outputClients) { - return new AutoValue_SdkHarnessClient_ActiveBundle<>( - bundleId, response, dataReceiver, outputClients); + this.bundleId = bundleId; + this.response = response; + this.inputReceiver = inputReceiver; + this.outputClients = outputClients; +} + +/** + * Returns an id used to represent this bundle. + */ +public String getBundleId() { + return bundleId; +} + +/** + * Returns a {@link FnDataReceiver receiver} which consumes input elements forwarding them + * to the SDK. When Review comment: Did you mean to complete this comment? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 85311) Time Spent: 4h 50m (was: 4h 40m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 4h 50m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=85309=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85309 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 28/Mar/18 15:59 Start Date: 28/Mar/18 15:59 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #4963: [BEAM-3326] Abstract away closing the inbound receiver, waiting for the bundle to finish, waiting for outbound to complete within the ActiveBundle. URL: https://github.com/apache/beam/pull/4963#discussion_r177800576 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java ## @@ -146,22 +154,92 @@ private BundleProcessor( } /** An active bundle for a particular {@link BeamFnApi.ProcessBundleDescriptor}. */ - @AutoValue - public abstract static class ActiveBundle { -public abstract String getBundleId(); - -public abstract CompletionStage getBundleResponse(); + public static class ActiveBundle implements AutoCloseable { +private final String bundleId; +private final CompletionStage response; +private final CloseableFnDataReceiverinputReceiver; +private final Map outputClients; -public abstract CloseableFnDataReceiver getInputReceiver(); -public abstract Map getOutputClients(); - -public static ActiveBundle create( +private ActiveBundle( String bundleId, CompletionStage response, -CloseableFnDataReceiver dataReceiver, +CloseableFnDataReceiver inputReceiver, Map outputClients) { - return new AutoValue_SdkHarnessClient_ActiveBundle<>( - bundleId, response, dataReceiver, outputClients); + this.bundleId = bundleId; + this.response = response; + this.inputReceiver = inputReceiver; + this.outputClients = outputClients; +} + +/** + * Returns an id used to represent this bundle. + */ +public String getBundleId() { + return bundleId; +} + +/** + * Returns a {@link FnDataReceiver receiver} which consumes input elements forwarding them + * to the SDK. When + */ +public FnDataReceiver getInputReceiver() { + return inputReceiver; +} + +/** + * Blocks till bundle processing is finished. This is comprised of: + * + * closing the {@link #getInputReceiver() input receiver}. + * waiting for the SDK to say that processing the bundle is finished. + * waiting for all inbound data clients to complete + * + * + * This method will throw an exception if bundle processing has failed. + * {@link Throwable#getSuppressed()} will return all the reasons as to why processing has + * failed. + */ +@Override +public void close() throws Exception { + Exception exception = null; + try { +inputReceiver.close(); + } catch (Exception e) { +exception = e; + } + try { +// We don't have to worry about the completion stage. +if (exception == null) { + MoreFutures.get(response); +} else { + // TODO: Handle aborting the bundle being processed. + throw new IllegalStateException("Processing bundle failed, TODO: abort bundle."); +} + } catch (Exception e) { +if (exception == null) { + exception = e; +} else { + exception.addSuppressed(e); +} + } + for (InboundDataClient outputClient : outputClients.values()) { +try { + // If we failed processing this bundle, we should cancel all inbound data. + if (exception == null) { +outputClient.awaitCompletion(); Review comment: Could the failure of a single inbound client cause other clients to fail? If so, would this be captured by the overall bundle result? If not, you will need to wait for completion of the first-finished result rather than an arbitrary client here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 85309) Time Spent: 4.5h (was: 4h 20m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL:
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=85299=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85299 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 28/Mar/18 15:44 Start Date: 28/Mar/18 15:44 Worklog Time Spent: 10m Work Description: lukecwik closed pull request #4963: [BEAM-3326] Abstract away closing the inbound receiver, waiting for the bundle to finish, waiting for outbound to complete within the ActiveBundle. URL: https://github.com/apache/beam/pull/4963 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java index f71616a9aae..de1aa57fe32 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java @@ -38,6 +38,7 @@ import org.apache.beam.sdk.fn.data.FnDataReceiver; import org.apache.beam.sdk.fn.data.InboundDataClient; import org.apache.beam.sdk.fn.data.LogicalEndpoint; +import org.apache.beam.sdk.util.MoreFutures; import org.apache.beam.sdk.util.WindowedValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,6 +97,13 @@ private BundleProcessor( * * The input channels for the returned {@link ActiveBundle} are derived from the instructions * in the {@link BeamFnApi.ProcessBundleDescriptor}. + * + * NOTE: It is important to {@link #close()} each bundle after all elements are emitted. + * {@code + * try (ActiveBundle bundle = SdkHarnessClient.newBundle(...)) { + * // send all elements + * } + * } */ public ActiveBundle newBundle( MapoutputReceivers) { @@ -133,7 +141,7 @@ private BundleProcessor( fnApiDataService.send( LogicalEndpoint.of(bundleId, remoteInput.getTarget()), remoteInput.getCoder()); - return ActiveBundle.create(bundleId, specificResponse, dataReceiver, outputClients); + return new ActiveBundle(bundleId, specificResponse, dataReceiver, outputClients); } private InboundDataClient attachReceiver( @@ -146,22 +154,92 @@ private BundleProcessor( } /** An active bundle for a particular {@link BeamFnApi.ProcessBundleDescriptor}. */ - @AutoValue - public abstract static class ActiveBundle { -public abstract String getBundleId(); - -public abstract CompletionStage getBundleResponse(); + public static class ActiveBundle implements AutoCloseable { +private final String bundleId; +private final CompletionStage response; +private final CloseableFnDataReceiver inputReceiver; +private final Map outputClients; -public abstract CloseableFnDataReceiver getInputReceiver(); -public abstract Map getOutputClients(); - -public static ActiveBundle create( +private ActiveBundle( String bundleId, CompletionStage response, -CloseableFnDataReceiver dataReceiver, +CloseableFnDataReceiver inputReceiver, Map outputClients) { - return new AutoValue_SdkHarnessClient_ActiveBundle<>( - bundleId, response, dataReceiver, outputClients); + this.bundleId = bundleId; + this.response = response; + this.inputReceiver = inputReceiver; + this.outputClients = outputClients; +} + +/** + * Returns an id used to represent this bundle. + */ +public String getBundleId() { + return bundleId; +} + +/** + * Returns a {@link FnDataReceiver receiver} which consumes input elements forwarding them + * to the SDK. When + */ +public FnDataReceiver getInputReceiver() { + return inputReceiver; +} + +/** + * Blocks till bundle processing is finished. This is comprised of: + * + * closing the {@link #getInputReceiver() input receiver}. + * waiting for the SDK to say that processing the bundle is finished. + * waiting for all inbound data clients to complete + * + * + * This method will throw an exception if bundle processing has failed. + * {@link Throwable#getSuppressed()} will return all the reasons as to why processing has + * failed. + */ +
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=85298=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85298 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 28/Mar/18 15:43 Start Date: 28/Mar/18 15:43 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #4963: [BEAM-3326] Abstract away closing the inbound receiver, waiting for the bundle to finish, waiting for outbound to complete within the ActiveBundle. URL: https://github.com/apache/beam/pull/4963#discussion_r177795953 ## File path: runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java ## @@ -221,6 +220,135 @@ public void testNewBundleAndProcessElements() throws Exception { WindowedValue.valueInGlobalWindow("eggs"))); } + @Test + public void handleCleanupWhenInputSenderFails() throws Exception { +String descriptorId1 = "descriptor1"; +Exception testException = new Exception(); + +InboundDataClient mockOutputReceiver = mock(InboundDataClient.class); +CloseableFnDataReceiver mockInputSender = mock(CloseableFnDataReceiver.class); + +ProcessBundleDescriptor descriptor = +ProcessBundleDescriptor.newBuilder().setId(descriptorId1).build(); +CompletableFuture processBundleResponseFuture = +new CompletableFuture<>(); +when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class))) +.thenReturn(new CompletableFuture<>()) +.thenReturn(processBundleResponseFuture); + +FullWindowedValueCoder coder = +FullWindowedValueCoder.of(StringUtf8Coder.of(), Coder.INSTANCE); +BundleProcessor processor = +sdkHarnessClient.getProcessor( +descriptor, RemoteInputDestination.of(coder, Target.getDefaultInstance())); +when(dataService.receive(any(), any(), any())).thenReturn(mockOutputReceiver); Review comment: I'll fix the comment in a separate PR and merge this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 85298) Time Spent: 4h 10m (was: 4h) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 4h 10m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=85297=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85297 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 28/Mar/18 15:42 Start Date: 28/Mar/18 15:42 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #4963: [BEAM-3326] Abstract away closing the inbound receiver, waiting for the bundle to finish, waiting for outbound to complete within the ActiveBundle. URL: https://github.com/apache/beam/pull/4963#discussion_r177795953 ## File path: runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java ## @@ -221,6 +220,135 @@ public void testNewBundleAndProcessElements() throws Exception { WindowedValue.valueInGlobalWindow("eggs"))); } + @Test + public void handleCleanupWhenInputSenderFails() throws Exception { +String descriptorId1 = "descriptor1"; +Exception testException = new Exception(); + +InboundDataClient mockOutputReceiver = mock(InboundDataClient.class); +CloseableFnDataReceiver mockInputSender = mock(CloseableFnDataReceiver.class); + +ProcessBundleDescriptor descriptor = +ProcessBundleDescriptor.newBuilder().setId(descriptorId1).build(); +CompletableFuture processBundleResponseFuture = +new CompletableFuture<>(); +when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class))) +.thenReturn(new CompletableFuture<>()) +.thenReturn(processBundleResponseFuture); + +FullWindowedValueCoder coder = +FullWindowedValueCoder.of(StringUtf8Coder.of(), Coder.INSTANCE); +BundleProcessor processor = +sdkHarnessClient.getProcessor( +descriptor, RemoteInputDestination.of(coder, Target.getDefaultInstance())); +when(dataService.receive(any(), any(), any())).thenReturn(mockOutputReceiver); Review comment: I'll fix the comment and merge. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 85297) Time Spent: 4h (was: 3h 50m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 4h > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=85182=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85182 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 28/Mar/18 09:59 Start Date: 28/Mar/18 09:59 Worklog Time Spent: 10m Work Description: aljoscha commented on a change in pull request #4963: [BEAM-3326] Abstract away closing the inbound receiver, waiting for the bundle to finish, waiting for outbound to complete within the ActiveBundle. URL: https://github.com/apache/beam/pull/4963#discussion_r177666906 ## File path: runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java ## @@ -221,6 +220,135 @@ public void testNewBundleAndProcessElements() throws Exception { WindowedValue.valueInGlobalWindow("eggs"))); } + @Test + public void handleCleanupWhenInputSenderFails() throws Exception { +String descriptorId1 = "descriptor1"; +Exception testException = new Exception(); + +InboundDataClient mockOutputReceiver = mock(InboundDataClient.class); +CloseableFnDataReceiver mockInputSender = mock(CloseableFnDataReceiver.class); + +ProcessBundleDescriptor descriptor = +ProcessBundleDescriptor.newBuilder().setId(descriptorId1).build(); +CompletableFuture processBundleResponseFuture = +new CompletableFuture<>(); +when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class))) +.thenReturn(new CompletableFuture<>()) +.thenReturn(processBundleResponseFuture); + +FullWindowedValueCoder coder = +FullWindowedValueCoder.of(StringUtf8Coder.of(), Coder.INSTANCE); +BundleProcessor processor = +sdkHarnessClient.getProcessor( +descriptor, RemoteInputDestination.of(coder, Target.getDefaultInstance())); +when(dataService.receive(any(), any(), any())).thenReturn(mockOutputReceiver); Review comment: off-topic: I noticed that `FnDataService.receive()` still refers to the "returned future" in the Javadoc even though it doesn't return a future anymore. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 85182) Time Spent: 3h 50m (was: 3h 40m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 3h 50m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=85098=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85098 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 28/Mar/18 01:41 Start Date: 28/Mar/18 01:41 Worklog Time Spent: 10m Work Description: lukecwik opened a new pull request #4963: [BEAM-3326] Abstract away closing the inbound receiver, waiting for the bundle to finish, waiting for outbound to complete within the ActiveBundle. URL: https://github.com/apache/beam/pull/4963 Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Make sure there is a [JIRA issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes. - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue. - [ ] Write a pull request description that is detailed enough to understand: - [ ] What the pull request does - [ ] Why it does it - [ ] How it does it - [ ] Why this approach - [ ] Each commit in the pull request should have a meaningful subject line and body. - [ ] Run `mvn clean verify` to make sure basic checks pass. A more thorough check will be performed on your pull request automatically. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 85098) Time Spent: 3.5h (was: 3h 20m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 3.5h > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=85099=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85099 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 28/Mar/18 01:41 Start Date: 28/Mar/18 01:41 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #4963: [BEAM-3326] Abstract away closing the inbound receiver, waiting for the bundle to finish, waiting for outbound to complete within the ActiveBundle. URL: https://github.com/apache/beam/pull/4963#issuecomment-376731599 CC: @bsidhom This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 85099) Time Spent: 3h 40m (was: 3.5h) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 3h 40m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=80905=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80905 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 15/Mar/18 18:00 Start Date: 15/Mar/18 18:00 Worklog Time Spent: 10m Work Description: tgroh commented on issue #4761: [BEAM-3326] Remove SdkHarnessClientControlService URL: https://github.com/apache/beam/pull/4761#issuecomment-373469231 Assuming the changes in https://github.com/apache/beam/pull/4866 are accepted, this likely becomes a no-op including when run against flink, because it removes the need for the `EnvironmentManager` to directly access data or state services. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 80905) Time Spent: 3h 20m (was: 3h 10m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 3h 20m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=80533=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80533 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 14/Mar/18 20:30 Start Date: 14/Mar/18 20:30 Worklog Time Spent: 10m Work Description: lukecwik closed pull request #4825: [BEAM-3326] Add an InProcess SdkHarness Rule URL: https://github.com/apache/beam/pull/4825 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java index 9d443427dc1..5a82a32a0b4 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java @@ -18,7 +18,10 @@ package org.apache.beam.runners.fnexecution.control; import io.grpc.stub.StreamObserver; +import java.util.Collection; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.beam.model.fnexecution.v1.BeamFnApi; import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc; import org.apache.beam.runners.fnexecution.FnService; @@ -31,6 +34,8 @@ private static final Logger LOGGER = LoggerFactory.getLogger(FnApiControlClientPoolService.class); private final BlockingQueue clientPool; + private final Collection vendedClients = new CopyOnWriteArrayList<>(); + private AtomicBoolean closed = new AtomicBoolean(); private FnApiControlClientPoolService(BlockingQueue clientPool) { this.clientPool = clientPool; @@ -61,6 +66,12 @@ public static FnApiControlClientPoolService offeringClientsToPool( LOGGER.info("Beam Fn Control client connected."); FnApiControlClient newClient = FnApiControlClient.forRequestObserver(requestObserver); try { + // Add the client to the pool of vended clients before making it available - we should close + // the client when we close even if no one has picked it up yet. This can occur after the + // service is closed, in which case the client will be discarded when the service is + // discarded, which should be performed by a call to #shutdownNow. The remote caller must be + // able to handle an unexpectedly terminated connection. + vendedClients.add(newClient); clientPool.put(newClient); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -70,7 +81,11 @@ public static FnApiControlClientPoolService offeringClientsToPool( } @Override - public void close() throws Exception { -// The clients in the pool are owned by the consumer, which is responsible for closing them + public void close() { +if (!closed.getAndSet(true)) { + for (FnApiControlClient vended : vendedClients) { +vended.close(); + } +} } } diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/GrpcDataService.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/GrpcDataService.java index 00b2a9afb92..a13d3d7d38c 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/GrpcDataService.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/GrpcDataService.java @@ -53,8 +53,7 @@ implements FnService, FnDataService { private static final Logger LOG = LoggerFactory.getLogger(GrpcDataService.class); - public static GrpcDataService create( - ExecutorService executor) { + public static GrpcDataService create(ExecutorService executor) { return new GrpcDataService(executor); } @@ -62,6 +61,9 @@ public static GrpcDataService create( /** * A collection of multiplexers which are not used to send data. A handle to these multiplexers is * maintained in order to perform an orderly shutdown. + * + * TODO: (BEAM-3811) Replace with some cancellable collection, to ensure that new clients of a + * closed {@link GrpcDataService} are closed with that {@link GrpcDataService}. */ private final Queue additionalMultiplexers; @@ -105,7 +107,9 @@ public void close() throws Exception { // Shutdown remaining clients } } -connectedClient.get().close(); +if (!connectedClient.isCancelled()) { +
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=80170=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80170 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 14/Mar/18 02:12 Start Date: 14/Mar/18 02:12 Worklog Time Spent: 10m Work Description: tgroh commented on issue #4825: [BEAM-3326] Add an InProcess SdkHarness Rule URL: https://github.com/apache/beam/pull/4825#issuecomment-372881433 run java gradle precommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 80170) Time Spent: 3h (was: 2h 50m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 3h > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=80169=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80169 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 14/Mar/18 02:12 Start Date: 14/Mar/18 02:12 Worklog Time Spent: 10m Work Description: tgroh commented on issue #4825: [BEAM-3326] Add an InProcess SdkHarness Rule URL: https://github.com/apache/beam/pull/4825#issuecomment-372881433 run java gradle precommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 80169) Time Spent: 2h 50m (was: 2h 40m) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 2h 50m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3326) Execute a Stage via the portability framework in the ReferenceRunner
[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=79766=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79766 ] ASF GitHub Bot logged work on BEAM-3326: Author: ASF GitHub Bot Created on: 13/Mar/18 04:47 Start Date: 13/Mar/18 04:47 Worklog Time Spent: 10m Work Description: tgroh commented on issue #4825: [BEAM-3326] Add an InProcess SdkHarness Rule URL: https://github.com/apache/beam/pull/4825#issuecomment-372545825 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 79766) Time Spent: 2h 40m (was: 2.5h) > Execute a Stage via the portability framework in the ReferenceRunner > > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Thomas Groh >Priority: Major > Labels: portability > Time Spent: 2h 40m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)