[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=103167=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103167 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 17/May/18 23:23 Start Date: 17/May/18 23:23 Worklog Time Spent: 10m Work Description: jkff closed pull request #5262: [BEAM-2588] Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java new file mode 100644 index 000..fde34826e14 --- /dev/null +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.jobsubmission; + +import com.google.protobuf.Struct; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc; +import org.apache.beam.model.pipeline.v1.Endpoints; +import org.apache.beam.runners.fnexecution.FnService; +import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A InMemoryJobService that prepares and runs jobs on behalf of a client using a + * {@link JobInvoker}. + * + * Job management is handled in-memory rather than any persistent storage, running the risk of + * leaking jobs if the InMemoryJobService crashes. + * + * TODO: replace in-memory job management state with persistent solution. + */ +public class InMemoryJobService extends JobServiceGrpc.JobServiceImplBase implements FnService { + private static final Logger LOG = LoggerFactory.getLogger(InMemoryJobService.class); + + public static InMemoryJobService create( + Endpoints.ApiServiceDescriptor stagingServiceDescriptor, JobInvoker invoker) { +return new InMemoryJobService(stagingServiceDescriptor, invoker); + } + + private final ConcurrentMappreparations; + private final ConcurrentMap invocations; + private final Endpoints.ApiServiceDescriptor stagingServiceDescriptor; + private final JobInvoker invoker; + + private InMemoryJobService( + Endpoints.ApiServiceDescriptor stagingServiceDescriptor, JobInvoker invoker) { +this.stagingServiceDescriptor = stagingServiceDescriptor; +this.invoker = invoker; + +this.preparations = new ConcurrentHashMap<>(); +this.invocations = new ConcurrentHashMap<>(); + } + +
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=103165=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103165 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 17/May/18 23:21 Start Date: 17/May/18 23:21 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5262: [BEAM-2588] Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r189128166 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java ## @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.jobsubmission; + +import com.google.protobuf.Struct; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc; +import org.apache.beam.model.pipeline.v1.Endpoints; +import org.apache.beam.runners.fnexecution.FnService; +import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A InMemoryJobService that prepares and runs jobs on behalf of a client using a + * {@link JobInvoker}. + * + * Job management is handled in-memory rather than any persistent storage, running the risk of + * leaking jobs if the InMemoryJobService crashes. + * + * TODO: replace in-memory job management state with persistent solution. + */ +public class InMemoryJobService extends JobServiceGrpc.JobServiceImplBase implements FnService { + private static final Logger LOG = LoggerFactory.getLogger(InMemoryJobService.class); + + public static InMemoryJobService create( + Endpoints.ApiServiceDescriptor stagingServiceDescriptor, JobInvoker invoker) { +return new InMemoryJobService(stagingServiceDescriptor, invoker); + } + + private final ConcurrentMappreparations; + private final ConcurrentMap invocations; + private final Endpoints.ApiServiceDescriptor stagingServiceDescriptor; + private final JobInvoker invoker; + + private InMemoryJobService( + Endpoints.ApiServiceDescriptor stagingServiceDescriptor, JobInvoker invoker) { +this.stagingServiceDescriptor = stagingServiceDescriptor; +this.invoker = invoker; + +this.preparations = new ConcurrentHashMap<>(); +this.invocations = new ConcurrentHashMap<>(); + } + + @Override + public void prepare( + PrepareJobRequest request, + StreamObserver responseObserver) { +try { + LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request); + // insert preparation + String preparationId = + String.format("%s_%s", request.getJobName(), UUID.randomUUID().toString()); + Struct pipelineOptions = request.getPipelineOptions(); + if (pipelineOptions == null) { +throw new NullPointerException("Encountered null pipeline
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=103164=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103164 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 17/May/18 23:14 Start Date: 17/May/18 23:14 Worklog Time Spent: 10m Work Description: axelmagn commented on issue #5262: [BEAM-2588] Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#issuecomment-390042225 @jkff: This is ready for review, and no longer a work in progress. Please take a look. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 103164) Time Spent: 5h 40m (was: 5.5h) > Portable Flink Runner Job API > - > > Key: BEAM-2588 > URL: https://issues.apache.org/jira/browse/BEAM-2588 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 5h 40m > Remaining Estimate: 0h > > The portable Flink runner needs to be wired into a job server so that it can > accept jobs the job api (https://s.apache.org/beam-job-api). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=101921=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101921 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 14/May/18 22:51 Start Date: 14/May/18 22:51 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5262: [BEAM-2588] WIP Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r188121725 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.jobsubmission; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Struct; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc; +import org.apache.beam.runners.fnexecution.FnService; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider; +import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A JobService that prepares and runs jobs on behalf of a client using a {@link JobInvoker}. + * + * Job management is handled in-memory rather than any persistent storage, running the risk of + * leaking jobs if the JobService crashes. + * + * TODO: replace in-memory job management state with persistent solution. + */ +public class JobService extends JobServiceGrpc.JobServiceImplBase implements FnService { + private static final Logger LOG = LoggerFactory.getLogger(JobService.class); + + public static JobService create( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +return new JobService(artifactStagingServiceProvider, invoker); + } + + private final ConcurrentMappreparations; + private final ConcurrentMap invocations; + private final ArtifactStagingServiceProvider artifactStagingServiceProvider; + private final JobInvoker invoker; + + private JobService( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +this.artifactStagingServiceProvider = artifactStagingServiceProvider; +this.invoker = invoker; + +this.preparations = new ConcurrentHashMap<>(); +this.invocations = new ConcurrentHashMap<>(); + } + + @Override + public void prepare( + PrepareJobRequest request, + StreamObserver responseObserver) { +try { + LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request); + // insert preparation + String
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=101349=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101349 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 11/May/18 23:29 Start Date: 11/May/18 23:29 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5262: [BEAM-2588] WIP Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r187753840 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobPreparation.java ## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.jobsubmission; + +import com.google.auto.value.AutoValue; +import com.google.protobuf.Struct; +import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService; + +/** A job that has been prepared, but not invoked. */ +@AutoValue +public abstract class JobPreparation { + public static Builder builder() { +return new AutoValue_JobPreparation.Builder(); + } + + public abstract String id(); + public abstract Pipeline pipeline(); + public abstract Struct options(); + public abstract GrpcFnServer stagingService(); + + @AutoValue.Builder + abstract static class Builder { +abstract Builder setId(String id); +abstract Builder setPipeline(Pipeline pipeline); +abstract Builder setOptions(Struct options); +abstract Builder setStagingService(GrpcFnServer stagingService); Review comment: key decision: the artifact staging token is the suitable item to return here. Because of the expected tight coupling between JobService and ArtifactStagingService, a staging token should be enough information for a runner or worker to construct an ArtifactRetrievalService or ArtifactSource on demand. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 101349) Time Spent: 5h 20m (was: 5h 10m) > Portable Flink Runner Job API > - > > Key: BEAM-2588 > URL: https://issues.apache.org/jira/browse/BEAM-2588 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 5h 20m > Remaining Estimate: 0h > > The portable Flink runner needs to be wired into a job server so that it can > accept jobs the job api (https://s.apache.org/beam-job-api). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=101343=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101343 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 11/May/18 23:00 Start Date: 11/May/18 23:00 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5262: [BEAM-2588] WIP Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r187750857 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.jobsubmission; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Struct; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc; +import org.apache.beam.runners.fnexecution.FnService; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider; +import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A JobService that prepares and runs jobs on behalf of a client using a {@link JobInvoker}. + * + * Job management is handled in-memory rather than any persistent storage, running the risk of + * leaking jobs if the JobService crashes. + * + * TODO: replace in-memory job management state with persistent solution. + */ +public class JobService extends JobServiceGrpc.JobServiceImplBase implements FnService { + private static final Logger LOG = LoggerFactory.getLogger(JobService.class); + + public static JobService create( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +return new JobService(artifactStagingServiceProvider, invoker); + } + + private final ConcurrentMappreparations; + private final ConcurrentMap invocations; + private final ArtifactStagingServiceProvider artifactStagingServiceProvider; + private final JobInvoker invoker; + + private JobService( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +this.artifactStagingServiceProvider = artifactStagingServiceProvider; +this.invoker = invoker; + +this.preparations = new ConcurrentHashMap<>(); +this.invocations = new ConcurrentHashMap<>(); + } + + @Override + public void prepare( + PrepareJobRequest request, + StreamObserver responseObserver) { +try { + LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request); + // insert preparation + String
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=101341=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101341 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 11/May/18 22:49 Start Date: 11/May/18 22:49 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5262: [BEAM-2588] WIP Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r187749655 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService.java ## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.artifact; + +import org.apache.beam.runners.fnexecution.FnService; + +/** An implementation of the Beam Artifact Staging Service. */ +public interface ArtifactStagingService extends FnService { Review comment: TBC offline This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 101341) Time Spent: 5h (was: 4h 50m) > Portable Flink Runner Job API > - > > Key: BEAM-2588 > URL: https://issues.apache.org/jira/browse/BEAM-2588 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 5h > Remaining Estimate: 0h > > The portable Flink runner needs to be wired into a job server so that it can > accept jobs the job api (https://s.apache.org/beam-job-api). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=101339=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101339 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 11/May/18 22:48 Start Date: 11/May/18 22:48 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5262: [BEAM-2588] WIP Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r187749562 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/package-info.java ## @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Internal job management service implementation of the Beam runner for Apache Flink. Review comment: whoops. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 101339) Time Spent: 4h 40m (was: 4.5h) > Portable Flink Runner Job API > - > > Key: BEAM-2588 > URL: https://issues.apache.org/jira/browse/BEAM-2588 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 4h 40m > Remaining Estimate: 0h > > The portable Flink runner needs to be wired into a job server so that it can > accept jobs the job api (https://s.apache.org/beam-job-api). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=101338=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101338 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 11/May/18 22:48 Start Date: 11/May/18 22:48 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5262: [BEAM-2588] WIP Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r187749514 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.jobsubmission; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Struct; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc; +import org.apache.beam.runners.fnexecution.FnService; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider; +import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A JobService that prepares and runs jobs on behalf of a client using a {@link JobInvoker}. + * + * Job management is handled in-memory rather than any persistent storage, running the risk of + * leaking jobs if the JobService crashes. + * + * TODO: replace in-memory job management state with persistent solution. + */ +public class JobService extends JobServiceGrpc.JobServiceImplBase implements FnService { + private static final Logger LOG = LoggerFactory.getLogger(JobService.class); + + public static JobService create( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +return new JobService(artifactStagingServiceProvider, invoker); + } + + private final ConcurrentMappreparations; + private final ConcurrentMap invocations; + private final ArtifactStagingServiceProvider artifactStagingServiceProvider; + private final JobInvoker invoker; + + private JobService( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +this.artifactStagingServiceProvider = artifactStagingServiceProvider; +this.invoker = invoker; + +this.preparations = new ConcurrentHashMap<>(); +this.invocations = new ConcurrentHashMap<>(); + } + + @Override + public void prepare( + PrepareJobRequest request, + StreamObserver responseObserver) { +try { + LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request); + // insert preparation + String
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=101340=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101340 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 11/May/18 22:49 Start Date: 11/May/18 22:49 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5262: [BEAM-2588] WIP Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r187749607 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.jobsubmission; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Struct; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc; +import org.apache.beam.runners.fnexecution.FnService; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider; +import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A JobService that prepares and runs jobs on behalf of a client using a {@link JobInvoker}. + * + * Job management is handled in-memory rather than any persistent storage, running the risk of + * leaking jobs if the JobService crashes. + * + * TODO: replace in-memory job management state with persistent solution. + */ +public class JobService extends JobServiceGrpc.JobServiceImplBase implements FnService { Review comment: SGTM This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 101340) Time Spent: 4h 50m (was: 4h 40m) > Portable Flink Runner Job API > - > > Key: BEAM-2588 > URL: https://issues.apache.org/jira/browse/BEAM-2588 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 4h 50m > Remaining Estimate: 0h > > The portable Flink runner needs to be wired into a job server so that it can > accept jobs the job api (https://s.apache.org/beam-job-api). -- This message was sent by Atlassian JIRA
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=101337=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101337 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 11/May/18 22:48 Start Date: 11/May/18 22:48 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5262: [BEAM-2588] WIP Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r187749492 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.jobsubmission; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Struct; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc; +import org.apache.beam.runners.fnexecution.FnService; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider; +import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A JobService that prepares and runs jobs on behalf of a client using a {@link JobInvoker}. + * + * Job management is handled in-memory rather than any persistent storage, running the risk of + * leaking jobs if the JobService crashes. + * + * TODO: replace in-memory job management state with persistent solution. + */ +public class JobService extends JobServiceGrpc.JobServiceImplBase implements FnService { + private static final Logger LOG = LoggerFactory.getLogger(JobService.class); + + public static JobService create( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +return new JobService(artifactStagingServiceProvider, invoker); + } + + private final ConcurrentMappreparations; + private final ConcurrentMap invocations; + private final ArtifactStagingServiceProvider artifactStagingServiceProvider; + private final JobInvoker invoker; + + private JobService( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +this.artifactStagingServiceProvider = artifactStagingServiceProvider; +this.invoker = invoker; + +this.preparations = new ConcurrentHashMap<>(); +this.invocations = new ConcurrentHashMap<>(); + } + + @Override + public void prepare( + PrepareJobRequest request, + StreamObserver responseObserver) { +try { + LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request); + // insert preparation + String
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=101330=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101330 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 11/May/18 22:30 Start Date: 11/May/18 22:30 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5262: [BEAM-2588] WIP Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r187747317 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobPreparation.java ## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.jobsubmission; + +import com.google.auto.value.AutoValue; +import com.google.protobuf.Struct; +import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService; + +/** A job that has been prepared, but not invoked. */ +@AutoValue +public abstract class JobPreparation { + public static Builder builder() { +return new AutoValue_JobPreparation.Builder(); + } + + public abstract String id(); + public abstract Pipeline pipeline(); + public abstract Struct options(); + public abstract GrpcFnServer stagingService(); + + @AutoValue.Builder + abstract static class Builder { +abstract Builder setId(String id); +abstract Builder setPipeline(Pipeline pipeline); +abstract Builder setOptions(Struct options); +abstract Builder setStagingService(GrpcFnServer stagingService); Review comment: Yeah, ArtifactStagingService is almost certainly the wrong attribute type. Really this should be the generic artifact reference that we discussed offline. Once we iron out what that should look like, I'll come back and amend this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 101330) Time Spent: 4h 10m (was: 4h) > Portable Flink Runner Job API > - > > Key: BEAM-2588 > URL: https://issues.apache.org/jira/browse/BEAM-2588 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 4h 10m > Remaining Estimate: 0h > > The portable Flink runner needs to be wired into a job server so that it can > accept jobs the job api (https://s.apache.org/beam-job-api). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=101328=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101328 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 11/May/18 22:28 Start Date: 11/May/18 22:28 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5262: [BEAM-2588] WIP Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r187747109 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.jobsubmission; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Struct; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc; +import org.apache.beam.runners.fnexecution.FnService; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider; +import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A JobService that prepares and runs jobs on behalf of a client using a {@link JobInvoker}. + * + * Job management is handled in-memory rather than any persistent storage, running the risk of + * leaking jobs if the JobService crashes. + * + * TODO: replace in-memory job management state with persistent solution. + */ +public class JobService extends JobServiceGrpc.JobServiceImplBase implements FnService { + private static final Logger LOG = LoggerFactory.getLogger(JobService.class); + + public static JobService create( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +return new JobService(artifactStagingServiceProvider, invoker); + } + + private final ConcurrentMappreparations; + private final ConcurrentMap invocations; + private final ArtifactStagingServiceProvider artifactStagingServiceProvider; + private final JobInvoker invoker; + + private JobService( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +this.artifactStagingServiceProvider = artifactStagingServiceProvider; +this.invoker = invoker; + +this.preparations = new ConcurrentHashMap<>(); +this.invocations = new ConcurrentHashMap<>(); + } + + @Override + public void prepare( + PrepareJobRequest request, + StreamObserver responseObserver) { +try { + LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request); + // insert preparation + String
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=101235=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101235 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 11/May/18 17:56 Start Date: 11/May/18 17:56 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5262: [BEAM-2588] WIP Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r187687909 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService.java ## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.artifact; + +import org.apache.beam.runners.fnexecution.FnService; + +/** An implementation of the Beam Artifact Staging Service. */ +public interface ArtifactStagingService extends FnService { Review comment: Really they are used by 3 clients in 3 completely different ways (the sdk client, the invoker, and the sdk harness), but I see your point. I'm happy to remove it from the current PR and have a design review of it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 101235) Time Spent: 3h 50m (was: 3h 40m) > Portable Flink Runner Job API > - > > Key: BEAM-2588 > URL: https://issues.apache.org/jira/browse/BEAM-2588 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 3h 50m > Remaining Estimate: 0h > > The portable Flink runner needs to be wired into a job server so that it can > accept jobs the job api (https://s.apache.org/beam-job-api). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100940=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100940 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 11/May/18 01:15 Start Date: 11/May/18 01:15 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5262: [BEAM-2588] WIP Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r187499486 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.jobsubmission; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Struct; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc; +import org.apache.beam.runners.fnexecution.FnService; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider; +import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A JobService that prepares and runs jobs on behalf of a client using a {@link JobInvoker}. + * + * Job management is handled in-memory rather than any persistent storage, running the risk of + * leaking jobs if the JobService crashes. + * + * TODO: replace in-memory job management state with persistent solution. + */ +public class JobService extends JobServiceGrpc.JobServiceImplBase implements FnService { + private static final Logger LOG = LoggerFactory.getLogger(JobService.class); + + public static JobService create( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +return new JobService(artifactStagingServiceProvider, invoker); + } + + private final ConcurrentMappreparations; + private final ConcurrentMap invocations; + private final ArtifactStagingServiceProvider artifactStagingServiceProvider; + private final JobInvoker invoker; + + private JobService( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +this.artifactStagingServiceProvider = artifactStagingServiceProvider; +this.invoker = invoker; + +this.preparations = new ConcurrentHashMap<>(); +this.invocations = new ConcurrentHashMap<>(); + } + + @Override + public void prepare( + PrepareJobRequest request, + StreamObserver responseObserver) { +try { + LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request); + // insert preparation + String
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100952=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100952 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 11/May/18 01:15 Start Date: 11/May/18 01:15 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5262: [BEAM-2588] WIP Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r187499599 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.jobsubmission; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Struct; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc; +import org.apache.beam.runners.fnexecution.FnService; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider; +import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A JobService that prepares and runs jobs on behalf of a client using a {@link JobInvoker}. + * + * Job management is handled in-memory rather than any persistent storage, running the risk of + * leaking jobs if the JobService crashes. + * + * TODO: replace in-memory job management state with persistent solution. + */ +public class JobService extends JobServiceGrpc.JobServiceImplBase implements FnService { Review comment: Maybe rename this to InMemoryJobService then? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 100952) Time Spent: 3h 40m (was: 3.5h) > Portable Flink Runner Job API > - > > Key: BEAM-2588 > URL: https://issues.apache.org/jira/browse/BEAM-2588 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 3h 40m > Remaining Estimate: 0h > > The portable Flink runner needs to be wired into a job server so that it can > accept jobs the job api (https://s.apache.org/beam-job-api). -- This
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100945=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100945 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 11/May/18 01:15 Start Date: 11/May/18 01:15 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5262: [BEAM-2588] WIP Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r187498839 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.jobsubmission; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Struct; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc; +import org.apache.beam.runners.fnexecution.FnService; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider; +import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A JobService that prepares and runs jobs on behalf of a client using a {@link JobInvoker}. + * + * Job management is handled in-memory rather than any persistent storage, running the risk of + * leaking jobs if the JobService crashes. + * + * TODO: replace in-memory job management state with persistent solution. + */ +public class JobService extends JobServiceGrpc.JobServiceImplBase implements FnService { + private static final Logger LOG = LoggerFactory.getLogger(JobService.class); + + public static JobService create( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +return new JobService(artifactStagingServiceProvider, invoker); + } + + private final ConcurrentMappreparations; + private final ConcurrentMap invocations; + private final ArtifactStagingServiceProvider artifactStagingServiceProvider; + private final JobInvoker invoker; + + private JobService( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +this.artifactStagingServiceProvider = artifactStagingServiceProvider; +this.invoker = invoker; + +this.preparations = new ConcurrentHashMap<>(); +this.invocations = new ConcurrentHashMap<>(); + } + + @Override + public void prepare( + PrepareJobRequest request, + StreamObserver responseObserver) { +try { + LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request); + // insert preparation + String
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100949=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100949 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 11/May/18 01:15 Start Date: 11/May/18 01:15 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5262: [BEAM-2588] WIP Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r187499130 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.jobsubmission; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Struct; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc; +import org.apache.beam.runners.fnexecution.FnService; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider; +import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A JobService that prepares and runs jobs on behalf of a client using a {@link JobInvoker}. + * + * Job management is handled in-memory rather than any persistent storage, running the risk of + * leaking jobs if the JobService crashes. + * + * TODO: replace in-memory job management state with persistent solution. + */ +public class JobService extends JobServiceGrpc.JobServiceImplBase implements FnService { + private static final Logger LOG = LoggerFactory.getLogger(JobService.class); + + public static JobService create( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +return new JobService(artifactStagingServiceProvider, invoker); + } + + private final ConcurrentMappreparations; + private final ConcurrentMap invocations; + private final ArtifactStagingServiceProvider artifactStagingServiceProvider; + private final JobInvoker invoker; + + private JobService( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +this.artifactStagingServiceProvider = artifactStagingServiceProvider; +this.invoker = invoker; + +this.preparations = new ConcurrentHashMap<>(); +this.invocations = new ConcurrentHashMap<>(); + } + + @Override + public void prepare( + PrepareJobRequest request, + StreamObserver responseObserver) { +try { + LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request); + // insert preparation + String
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100951=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100951 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 11/May/18 01:15 Start Date: 11/May/18 01:15 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5262: [BEAM-2588] WIP Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r187499520 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.jobsubmission; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Struct; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc; +import org.apache.beam.runners.fnexecution.FnService; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider; +import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A JobService that prepares and runs jobs on behalf of a client using a {@link JobInvoker}. + * + * Job management is handled in-memory rather than any persistent storage, running the risk of + * leaking jobs if the JobService crashes. + * + * TODO: replace in-memory job management state with persistent solution. + */ +public class JobService extends JobServiceGrpc.JobServiceImplBase implements FnService { + private static final Logger LOG = LoggerFactory.getLogger(JobService.class); + + public static JobService create( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +return new JobService(artifactStagingServiceProvider, invoker); + } + + private final ConcurrentMappreparations; + private final ConcurrentMap invocations; + private final ArtifactStagingServiceProvider artifactStagingServiceProvider; + private final JobInvoker invoker; + + private JobService( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +this.artifactStagingServiceProvider = artifactStagingServiceProvider; +this.invoker = invoker; + +this.preparations = new ConcurrentHashMap<>(); +this.invocations = new ConcurrentHashMap<>(); + } + + @Override + public void prepare( + PrepareJobRequest request, + StreamObserver responseObserver) { +try { + LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request); + // insert preparation + String
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100946=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100946 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 11/May/18 01:15 Start Date: 11/May/18 01:15 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5262: [BEAM-2588] WIP Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r187498589 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobPreparation.java ## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.jobsubmission; + +import com.google.auto.value.AutoValue; +import com.google.protobuf.Struct; +import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService; + +/** A job that has been prepared, but not invoked. */ +@AutoValue +public abstract class JobPreparation { + public static Builder builder() { +return new AutoValue_JobPreparation.Builder(); + } + + public abstract String id(); + public abstract Pipeline pipeline(); + public abstract Struct options(); + public abstract GrpcFnServer stagingService(); + + @AutoValue.Builder + abstract static class Builder { +abstract Builder setId(String id); +abstract Builder setPipeline(Pipeline pipeline); +abstract Builder setOptions(Struct options); +abstract Builder setStagingService(GrpcFnServer stagingService); Review comment: Why is an ArtifactStagingService server part of a JobPreparation? Seems like the server should just live together with the JobService, no? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 100946) Time Spent: 3h (was: 2h 50m) > Portable Flink Runner Job API > - > > Key: BEAM-2588 > URL: https://issues.apache.org/jira/browse/BEAM-2588 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 3h > Remaining Estimate: 0h > > The portable Flink runner needs to be wired into a job server so that it can > accept jobs the job api (https://s.apache.org/beam-job-api). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100944=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100944 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 11/May/18 01:15 Start Date: 11/May/18 01:15 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5262: [BEAM-2588] WIP Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r187499551 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/package-info.java ## @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Internal job management service implementation of the Beam runner for Apache Flink. Review comment: This is not a Flink-specific package though. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 100944) Time Spent: 2h 40m (was: 2.5h) > Portable Flink Runner Job API > - > > Key: BEAM-2588 > URL: https://issues.apache.org/jira/browse/BEAM-2588 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 2h 40m > Remaining Estimate: 0h > > The portable Flink runner needs to be wired into a job server so that it can > accept jobs the job api (https://s.apache.org/beam-job-api). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100948=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100948 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 11/May/18 01:15 Start Date: 11/May/18 01:15 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5262: [BEAM-2588] WIP Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r187499000 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.jobsubmission; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Struct; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc; +import org.apache.beam.runners.fnexecution.FnService; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider; +import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A JobService that prepares and runs jobs on behalf of a client using a {@link JobInvoker}. + * + * Job management is handled in-memory rather than any persistent storage, running the risk of + * leaking jobs if the JobService crashes. + * + * TODO: replace in-memory job management state with persistent solution. + */ +public class JobService extends JobServiceGrpc.JobServiceImplBase implements FnService { + private static final Logger LOG = LoggerFactory.getLogger(JobService.class); + + public static JobService create( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +return new JobService(artifactStagingServiceProvider, invoker); + } + + private final ConcurrentMappreparations; + private final ConcurrentMap invocations; + private final ArtifactStagingServiceProvider artifactStagingServiceProvider; + private final JobInvoker invoker; + + private JobService( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +this.artifactStagingServiceProvider = artifactStagingServiceProvider; +this.invoker = invoker; + +this.preparations = new ConcurrentHashMap<>(); +this.invocations = new ConcurrentHashMap<>(); + } + + @Override + public void prepare( + PrepareJobRequest request, + StreamObserver responseObserver) { +try { + LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request); + // insert preparation + String
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100942=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100942 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 11/May/18 01:15 Start Date: 11/May/18 01:15 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5262: [BEAM-2588] WIP Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r187498959 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.jobsubmission; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Struct; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc; +import org.apache.beam.runners.fnexecution.FnService; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider; +import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A JobService that prepares and runs jobs on behalf of a client using a {@link JobInvoker}. + * + * Job management is handled in-memory rather than any persistent storage, running the risk of + * leaking jobs if the JobService crashes. + * + * TODO: replace in-memory job management state with persistent solution. + */ +public class JobService extends JobServiceGrpc.JobServiceImplBase implements FnService { + private static final Logger LOG = LoggerFactory.getLogger(JobService.class); + + public static JobService create( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +return new JobService(artifactStagingServiceProvider, invoker); + } + + private final ConcurrentMappreparations; + private final ConcurrentMap invocations; + private final ArtifactStagingServiceProvider artifactStagingServiceProvider; + private final JobInvoker invoker; + + private JobService( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +this.artifactStagingServiceProvider = artifactStagingServiceProvider; +this.invoker = invoker; + +this.preparations = new ConcurrentHashMap<>(); +this.invocations = new ConcurrentHashMap<>(); + } + + @Override + public void prepare( + PrepareJobRequest request, + StreamObserver responseObserver) { +try { + LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request); + // insert preparation + String
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100941=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100941 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 11/May/18 01:15 Start Date: 11/May/18 01:15 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5262: [BEAM-2588] WIP Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r187499937 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService.java ## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.artifact; + +import org.apache.beam.runners.fnexecution.FnService; + +/** An implementation of the Beam Artifact Staging Service. */ +public interface ArtifactStagingService extends FnService { Review comment: I can't say I'm quite convinced yet. Artifacts are used by 2 kinds of clients in 2 completely different ways - nobody needs both interfaces, so there's no need for a combined interface, and its existence may interfere with future refactoring. Moreover, in this PR, we never call createAccessor(), so I would suggest to remove this class from the current PR and defer the issue until the one where it is used (possibly after an offline discussion). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 100941) Time Spent: 2h 10m (was: 2h) > Portable Flink Runner Job API > - > > Key: BEAM-2588 > URL: https://issues.apache.org/jira/browse/BEAM-2588 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 2h 10m > Remaining Estimate: 0h > > The portable Flink runner needs to be wired into a job server so that it can > accept jobs the job api (https://s.apache.org/beam-job-api). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100947=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100947 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 11/May/18 01:15 Start Date: 11/May/18 01:15 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5262: [BEAM-2588] WIP Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r187498931 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.jobsubmission; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Struct; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc; +import org.apache.beam.runners.fnexecution.FnService; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider; +import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A JobService that prepares and runs jobs on behalf of a client using a {@link JobInvoker}. + * + * Job management is handled in-memory rather than any persistent storage, running the risk of + * leaking jobs if the JobService crashes. + * + * TODO: replace in-memory job management state with persistent solution. + */ +public class JobService extends JobServiceGrpc.JobServiceImplBase implements FnService { + private static final Logger LOG = LoggerFactory.getLogger(JobService.class); + + public static JobService create( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +return new JobService(artifactStagingServiceProvider, invoker); + } + + private final ConcurrentMappreparations; + private final ConcurrentMap invocations; + private final ArtifactStagingServiceProvider artifactStagingServiceProvider; + private final JobInvoker invoker; + + private JobService( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +this.artifactStagingServiceProvider = artifactStagingServiceProvider; +this.invoker = invoker; + +this.preparations = new ConcurrentHashMap<>(); +this.invocations = new ConcurrentHashMap<>(); + } + + @Override + public void prepare( + PrepareJobRequest request, + StreamObserver responseObserver) { +try { + LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request); + // insert preparation + String
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100943=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100943 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 11/May/18 01:15 Start Date: 11/May/18 01:15 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5262: [BEAM-2588] WIP Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r187499371 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java ## @@ -0,0 +1,309 @@ +package org.apache.beam.runners.fnexecution.jobsubmission; + + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Struct; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc; +import org.apache.beam.runners.fnexecution.FnService; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A JobService that prepares and runs jobs on behalf of a client using a {@link JobInvoker}. + */ +public class JobService extends JobServiceGrpc.JobServiceImplBase implements FnService { + private static final Logger LOG = LoggerFactory.getLogger(JobService.class); + + public static JobService create( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +return new JobService(artifactStagingServiceProvider, invoker); + } + + private final ConcurrentMappreparations; + private final ConcurrentMap invocations; + private final ArtifactStagingServiceProvider artifactStagingServiceProvider; + private final JobInvoker invoker; + + private JobService( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +this.artifactStagingServiceProvider = artifactStagingServiceProvider; +this.invoker = invoker; + +this.preparations = new ConcurrentHashMap<>(); +this.invocations = new ConcurrentHashMap<>(); + } + + @Override + public void prepare( + PrepareJobRequest request, + StreamObserver responseObserver) { +try { + LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request); + // insert preparation + String preparationId = + String.format("%s_%d", request.getJobName(), ThreadLocalRandom.current().nextInt()); + GrpcFnServer stagingService = + artifactStagingServiceProvider.forJob(preparationId); + Struct pipelineOptions = request.getPipelineOptions(); + if (pipelineOptions == null) { +LOG.trace("PIPELINE OPTIONS IS NULL"); +throw new NullPointerException("Encountered null pipeline options."); +/* +LOG.debug("Encountered null pipeline options. Using default."); +pipelineOptions = Struct.getDefaultInstance(); +*/ + } else { +LOG.trace("PIPELINE OPTIONS IS NOT NULL"); + } + LOG.trace("PIPELINE OPTIONS {} {}", pipelineOptions.getClass(), pipelineOptions); + JobPreparation preparation = + JobPreparation + .builder() + .setId(preparationId) + .setPipeline(request.getPipeline()) + .setOptions(pipelineOptions) + .setStagingService(stagingService) + .build(); + JobPreparation previous = preparations.putIfAbsent(preparationId, preparation); + if (previous != null) { +String errMessage = +
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100950=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100950 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 11/May/18 01:15 Start Date: 11/May/18 01:15 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5262: [BEAM-2588] WIP Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r187498721 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.jobsubmission; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Struct; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc; +import org.apache.beam.runners.fnexecution.FnService; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider; +import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A JobService that prepares and runs jobs on behalf of a client using a {@link JobInvoker}. + * + * Job management is handled in-memory rather than any persistent storage, running the risk of + * leaking jobs if the JobService crashes. + * + * TODO: replace in-memory job management state with persistent solution. + */ +public class JobService extends JobServiceGrpc.JobServiceImplBase implements FnService { + private static final Logger LOG = LoggerFactory.getLogger(JobService.class); + + public static JobService create( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +return new JobService(artifactStagingServiceProvider, invoker); + } + + private final ConcurrentMappreparations; + private final ConcurrentMap invocations; + private final ArtifactStagingServiceProvider artifactStagingServiceProvider; + private final JobInvoker invoker; + + private JobService( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +this.artifactStagingServiceProvider = artifactStagingServiceProvider; +this.invoker = invoker; + +this.preparations = new ConcurrentHashMap<>(); +this.invocations = new ConcurrentHashMap<>(); + } + + @Override + public void prepare( + PrepareJobRequest request, + StreamObserver responseObserver) { +try { + LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request); + // insert preparation + String
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100936=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100936 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 11/May/18 01:02 Start Date: 11/May/18 01:02 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5262: [BEAM-2588] WIP Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r187498633 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java ## @@ -0,0 +1,309 @@ +package org.apache.beam.runners.fnexecution.jobsubmission; + + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Struct; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc; +import org.apache.beam.runners.fnexecution.FnService; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A JobService that prepares and runs jobs on behalf of a client using a {@link JobInvoker}. + */ +public class JobService extends JobServiceGrpc.JobServiceImplBase implements FnService { + private static final Logger LOG = LoggerFactory.getLogger(JobService.class); + + public static JobService create( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +return new JobService(artifactStagingServiceProvider, invoker); + } + + private final ConcurrentMappreparations; + private final ConcurrentMap invocations; + private final ArtifactStagingServiceProvider artifactStagingServiceProvider; + private final JobInvoker invoker; + + private JobService( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +this.artifactStagingServiceProvider = artifactStagingServiceProvider; +this.invoker = invoker; + +this.preparations = new ConcurrentHashMap<>(); +this.invocations = new ConcurrentHashMap<>(); + } + + @Override + public void prepare( + PrepareJobRequest request, + StreamObserver responseObserver) { +try { + LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request); + // insert preparation + String preparationId = + String.format("%s_%d", request.getJobName(), ThreadLocalRandom.current().nextInt()); + GrpcFnServer stagingService = + artifactStagingServiceProvider.forJob(preparationId); + Struct pipelineOptions = request.getPipelineOptions(); + if (pipelineOptions == null) { +LOG.trace("PIPELINE OPTIONS IS NULL"); +throw new NullPointerException("Encountered null pipeline options."); +/* +LOG.debug("Encountered null pipeline options. Using default."); +pipelineOptions = Struct.getDefaultInstance(); +*/ + } else { +LOG.trace("PIPELINE OPTIONS IS NOT NULL"); + } + LOG.trace("PIPELINE OPTIONS {} {}", pipelineOptions.getClass(), pipelineOptions); + JobPreparation preparation = + JobPreparation + .builder() + .setId(preparationId) + .setPipeline(request.getPipeline()) + .setOptions(pipelineOptions) + .setStagingService(stagingService) + .build(); + JobPreparation previous = preparations.putIfAbsent(preparationId, preparation); Review comment: fixed
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100847=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100847 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 10/May/18 21:30 Start Date: 10/May/18 21:30 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5262: [BEAM-2588] WIP Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r187465720 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java ## @@ -0,0 +1,309 @@ +package org.apache.beam.runners.fnexecution.jobsubmission; + + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Struct; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc; +import org.apache.beam.runners.fnexecution.FnService; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A JobService that prepares and runs jobs on behalf of a client using a {@link JobInvoker}. + */ +public class JobService extends JobServiceGrpc.JobServiceImplBase implements FnService { + private static final Logger LOG = LoggerFactory.getLogger(JobService.class); + + public static JobService create( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +return new JobService(artifactStagingServiceProvider, invoker); + } + + private final ConcurrentMappreparations; + private final ConcurrentMap invocations; + private final ArtifactStagingServiceProvider artifactStagingServiceProvider; + private final JobInvoker invoker; + + private JobService( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +this.artifactStagingServiceProvider = artifactStagingServiceProvider; +this.invoker = invoker; + +this.preparations = new ConcurrentHashMap<>(); +this.invocations = new ConcurrentHashMap<>(); + } + + @Override + public void prepare( + PrepareJobRequest request, + StreamObserver responseObserver) { +try { + LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request); + // insert preparation + String preparationId = + String.format("%s_%d", request.getJobName(), ThreadLocalRandom.current().nextInt()); + GrpcFnServer stagingService = + artifactStagingServiceProvider.forJob(preparationId); + Struct pipelineOptions = request.getPipelineOptions(); + if (pipelineOptions == null) { +LOG.trace("PIPELINE OPTIONS IS NULL"); +throw new NullPointerException("Encountered null pipeline options."); +/* +LOG.debug("Encountered null pipeline options. Using default."); +pipelineOptions = Struct.getDefaultInstance(); +*/ + } else { +LOG.trace("PIPELINE OPTIONS IS NOT NULL"); + } + LOG.trace("PIPELINE OPTIONS {} {}", pipelineOptions.getClass(), pipelineOptions); + JobPreparation preparation = + JobPreparation + .builder() + .setId(preparationId) + .setPipeline(request.getPipeline()) + .setOptions(pipelineOptions) + .setStagingService(stagingService) + .build(); + JobPreparation previous = preparations.putIfAbsent(preparationId, preparation); + if (previous != null) { +String errMessage
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100848=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100848 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 10/May/18 21:30 Start Date: 10/May/18 21:30 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5262: [BEAM-2588] WIP Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r187464808 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java ## @@ -0,0 +1,309 @@ +package org.apache.beam.runners.fnexecution.jobsubmission; + + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Struct; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc; +import org.apache.beam.runners.fnexecution.FnService; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A JobService that prepares and runs jobs on behalf of a client using a {@link JobInvoker}. + */ +public class JobService extends JobServiceGrpc.JobServiceImplBase implements FnService { + private static final Logger LOG = LoggerFactory.getLogger(JobService.class); + + public static JobService create( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +return new JobService(artifactStagingServiceProvider, invoker); + } + + private final ConcurrentMappreparations; + private final ConcurrentMap invocations; + private final ArtifactStagingServiceProvider artifactStagingServiceProvider; + private final JobInvoker invoker; + + private JobService( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +this.artifactStagingServiceProvider = artifactStagingServiceProvider; +this.invoker = invoker; + +this.preparations = new ConcurrentHashMap<>(); +this.invocations = new ConcurrentHashMap<>(); + } + + @Override + public void prepare( + PrepareJobRequest request, + StreamObserver responseObserver) { +try { + LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request); + // insert preparation + String preparationId = + String.format("%s_%d", request.getJobName(), ThreadLocalRandom.current().nextInt()); + GrpcFnServer stagingService = + artifactStagingServiceProvider.forJob(preparationId); + Struct pipelineOptions = request.getPipelineOptions(); + if (pipelineOptions == null) { +LOG.trace("PIPELINE OPTIONS IS NULL"); +throw new NullPointerException("Encountered null pipeline options."); +/* +LOG.debug("Encountered null pipeline options. Using default."); +pipelineOptions = Struct.getDefaultInstance(); +*/ + } else { +LOG.trace("PIPELINE OPTIONS IS NOT NULL"); + } + LOG.trace("PIPELINE OPTIONS {} {}", pipelineOptions.getClass(), pipelineOptions); + JobPreparation preparation = + JobPreparation + .builder() + .setId(preparationId) + .setPipeline(request.getPipeline()) + .setOptions(pipelineOptions) + .setStagingService(stagingService) + .build(); + JobPreparation previous = preparations.putIfAbsent(preparationId, preparation); + if (previous != null) { +String errMessage
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100844=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100844 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 10/May/18 21:18 Start Date: 10/May/18 21:18 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5262: [BEAM-2588] WIP Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r187463317 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java ## @@ -0,0 +1,309 @@ +package org.apache.beam.runners.fnexecution.jobsubmission; + + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Struct; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc; +import org.apache.beam.runners.fnexecution.FnService; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A JobService that prepares and runs jobs on behalf of a client using a {@link JobInvoker}. Review comment: fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 100844) Time Spent: 1.5h (was: 1h 20m) > Portable Flink Runner Job API > - > > Key: BEAM-2588 > URL: https://issues.apache.org/jira/browse/BEAM-2588 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 1.5h > Remaining Estimate: 0h > > The portable Flink runner needs to be wired into a job server so that it can > accept jobs the job api (https://s.apache.org/beam-job-api). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100827=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100827 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 10/May/18 20:46 Start Date: 10/May/18 20:46 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5262: [BEAM-2588] WIP Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r187455124 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java ## @@ -0,0 +1,41 @@ +package org.apache.beam.runners.fnexecution.jobsubmission; + +import io.grpc.stub.StreamObserver; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; + +/** + * Internal representation of a Job which has been invoked (prepared and run) by a client. + */ +public interface JobInvocation { + + /** + * Start the job. + */ + void start(); + + /** + * @return Unique identifier for the job invocation. + */ + String getId(); + + /** + * Cancel the job. + */ + void cancel(); + + /** + * Retrieve the job's current state. + */ + JobState.Enum getState(); + + /** + * Observe job state changes with a {@link StreamObserver}. + */ + void addStateObserver(StreamObserver stateStreamObserver); + + /** + * Observe job messages with a {@link StreamObserver}. + */ + void addMessageObserver(StreamObserver messageStreamObserver); Review comment: fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 100827) Time Spent: 1h 20m (was: 1h 10m) > Portable Flink Runner Job API > - > > Key: BEAM-2588 > URL: https://issues.apache.org/jira/browse/BEAM-2588 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 1h 20m > Remaining Estimate: 0h > > The portable Flink runner needs to be wired into a job server so that it can > accept jobs the job api (https://s.apache.org/beam-job-api). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100788=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100788 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 10/May/18 19:43 Start Date: 10/May/18 19:43 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5262: [BEAM-2588] WIP Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r187437816 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService.java ## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.artifact; + +import org.apache.beam.runners.fnexecution.FnService; + +/** An implementation of the Beam Artifact Staging Service. */ +public interface ArtifactStagingService extends FnService { Review comment: That is indeed intended. There are 2 expected users: the client uploading artifacts during job submission, and the runner retrieving those artifacts for distribution to workers. So once we upload the artifacts we need to communicate them to the runner somehow, and this is done through the createAccessor method. The alternative was to a) create a separate interface for createAccessor(),and b) then create a third interface that extends both ArtifactStagingService and interface (a). However, since getting access to the artifacts once they are staged is a reasonable expectation to put on the ArtifactStagingService, I chose to just add that method to the existing interface. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 100788) Time Spent: 1h 10m (was: 1h) > Portable Flink Runner Job API > - > > Key: BEAM-2588 > URL: https://issues.apache.org/jira/browse/BEAM-2588 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 1h 10m > Remaining Estimate: 0h > > The portable Flink runner needs to be wired into a job server so that it can > accept jobs the job api (https://s.apache.org/beam-job-api). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100342=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100342 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 10/May/18 00:12 Start Date: 10/May/18 00:12 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5262: [BEAM-2588] WIP Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r187210523 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java ## @@ -0,0 +1,309 @@ +package org.apache.beam.runners.fnexecution.jobsubmission; + + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Struct; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc; +import org.apache.beam.runners.fnexecution.FnService; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A JobService that prepares and runs jobs on behalf of a client using a {@link JobInvoker}. + */ +public class JobService extends JobServiceGrpc.JobServiceImplBase implements FnService { + private static final Logger LOG = LoggerFactory.getLogger(JobService.class); + + public static JobService create( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +return new JobService(artifactStagingServiceProvider, invoker); + } + + private final ConcurrentMappreparations; + private final ConcurrentMap invocations; + private final ArtifactStagingServiceProvider artifactStagingServiceProvider; + private final JobInvoker invoker; + + private JobService( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +this.artifactStagingServiceProvider = artifactStagingServiceProvider; +this.invoker = invoker; + +this.preparations = new ConcurrentHashMap<>(); +this.invocations = new ConcurrentHashMap<>(); + } + + @Override + public void prepare( + PrepareJobRequest request, + StreamObserver responseObserver) { +try { + LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request); + // insert preparation + String preparationId = + String.format("%s_%d", request.getJobName(), ThreadLocalRandom.current().nextInt()); + GrpcFnServer stagingService = + artifactStagingServiceProvider.forJob(preparationId); + Struct pipelineOptions = request.getPipelineOptions(); + if (pipelineOptions == null) { +LOG.trace("PIPELINE OPTIONS IS NULL"); +throw new NullPointerException("Encountered null pipeline options."); +/* +LOG.debug("Encountered null pipeline options. Using default."); +pipelineOptions = Struct.getDefaultInstance(); +*/ + } else { +LOG.trace("PIPELINE OPTIONS IS NOT NULL"); + } + LOG.trace("PIPELINE OPTIONS {} {}", pipelineOptions.getClass(), pipelineOptions); + JobPreparation preparation = + JobPreparation + .builder() + .setId(preparationId) + .setPipeline(request.getPipeline()) + .setOptions(pipelineOptions) + .setStagingService(stagingService) + .build(); + JobPreparation previous = preparations.putIfAbsent(preparationId, preparation); + if (previous != null) { +String errMessage
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100341=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100341 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 10/May/18 00:10 Start Date: 10/May/18 00:10 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5262: [BEAM-2588] WIP Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r187210291 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java ## @@ -0,0 +1,309 @@ +package org.apache.beam.runners.fnexecution.jobsubmission; + + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Struct; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc; +import org.apache.beam.runners.fnexecution.FnService; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A JobService that prepares and runs jobs on behalf of a client using a {@link JobInvoker}. + */ +public class JobService extends JobServiceGrpc.JobServiceImplBase implements FnService { + private static final Logger LOG = LoggerFactory.getLogger(JobService.class); + + public static JobService create( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +return new JobService(artifactStagingServiceProvider, invoker); + } + + private final ConcurrentMappreparations; + private final ConcurrentMap invocations; + private final ArtifactStagingServiceProvider artifactStagingServiceProvider; + private final JobInvoker invoker; + + private JobService( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +this.artifactStagingServiceProvider = artifactStagingServiceProvider; +this.invoker = invoker; + +this.preparations = new ConcurrentHashMap<>(); +this.invocations = new ConcurrentHashMap<>(); + } + + @Override + public void prepare( + PrepareJobRequest request, + StreamObserver responseObserver) { +try { + LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request); + // insert preparation + String preparationId = + String.format("%s_%d", request.getJobName(), ThreadLocalRandom.current().nextInt()); + GrpcFnServer stagingService = + artifactStagingServiceProvider.forJob(preparationId); + Struct pipelineOptions = request.getPipelineOptions(); + if (pipelineOptions == null) { +LOG.trace("PIPELINE OPTIONS IS NULL"); +throw new NullPointerException("Encountered null pipeline options."); +/* +LOG.debug("Encountered null pipeline options. Using default."); +pipelineOptions = Struct.getDefaultInstance(); +*/ + } else { +LOG.trace("PIPELINE OPTIONS IS NOT NULL"); + } + LOG.trace("PIPELINE OPTIONS {} {}", pipelineOptions.getClass(), pipelineOptions); + JobPreparation preparation = + JobPreparation + .builder() + .setId(preparationId) + .setPipeline(request.getPipeline()) + .setOptions(pipelineOptions) + .setStagingService(stagingService) + .build(); + JobPreparation previous = preparations.putIfAbsent(preparationId, preparation); + if (previous != null) { +String errMessage
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100337=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100337 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 10/May/18 00:07 Start Date: 10/May/18 00:07 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5262: [BEAM-2588] WIP Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r187209845 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java ## @@ -0,0 +1,309 @@ +package org.apache.beam.runners.fnexecution.jobsubmission; + + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Struct; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc; +import org.apache.beam.runners.fnexecution.FnService; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A JobService that prepares and runs jobs on behalf of a client using a {@link JobInvoker}. + */ +public class JobService extends JobServiceGrpc.JobServiceImplBase implements FnService { + private static final Logger LOG = LoggerFactory.getLogger(JobService.class); + + public static JobService create( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +return new JobService(artifactStagingServiceProvider, invoker); + } + + private final ConcurrentMappreparations; + private final ConcurrentMap invocations; + private final ArtifactStagingServiceProvider artifactStagingServiceProvider; + private final JobInvoker invoker; + + private JobService( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +this.artifactStagingServiceProvider = artifactStagingServiceProvider; +this.invoker = invoker; + +this.preparations = new ConcurrentHashMap<>(); +this.invocations = new ConcurrentHashMap<>(); + } + + @Override + public void prepare( + PrepareJobRequest request, + StreamObserver responseObserver) { +try { + LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request); + // insert preparation + String preparationId = + String.format("%s_%d", request.getJobName(), ThreadLocalRandom.current().nextInt()); + GrpcFnServer stagingService = + artifactStagingServiceProvider.forJob(preparationId); + Struct pipelineOptions = request.getPipelineOptions(); + if (pipelineOptions == null) { +LOG.trace("PIPELINE OPTIONS IS NULL"); +throw new NullPointerException("Encountered null pipeline options."); +/* +LOG.debug("Encountered null pipeline options. Using default."); +pipelineOptions = Struct.getDefaultInstance(); +*/ + } else { +LOG.trace("PIPELINE OPTIONS IS NOT NULL"); + } + LOG.trace("PIPELINE OPTIONS {} {}", pipelineOptions.getClass(), pipelineOptions); + JobPreparation preparation = + JobPreparation + .builder() + .setId(preparationId) + .setPipeline(request.getPipeline()) + .setOptions(pipelineOptions) + .setStagingService(stagingService) + .build(); + JobPreparation previous = preparations.putIfAbsent(preparationId, preparation); + if (previous != null) { +String errMessage
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100335=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100335 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 09/May/18 23:49 Start Date: 09/May/18 23:49 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5262: [BEAM-2588] WIP Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r187207191 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java ## @@ -0,0 +1,309 @@ +package org.apache.beam.runners.fnexecution.jobsubmission; + + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Struct; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc; +import org.apache.beam.runners.fnexecution.FnService; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A JobService that prepares and runs jobs on behalf of a client using a {@link JobInvoker}. + */ +public class JobService extends JobServiceGrpc.JobServiceImplBase implements FnService { + private static final Logger LOG = LoggerFactory.getLogger(JobService.class); + + public static JobService create( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +return new JobService(artifactStagingServiceProvider, invoker); + } + + private final ConcurrentMappreparations; + private final ConcurrentMap invocations; + private final ArtifactStagingServiceProvider artifactStagingServiceProvider; + private final JobInvoker invoker; + + private JobService( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +this.artifactStagingServiceProvider = artifactStagingServiceProvider; +this.invoker = invoker; + +this.preparations = new ConcurrentHashMap<>(); +this.invocations = new ConcurrentHashMap<>(); + } + + @Override + public void prepare( + PrepareJobRequest request, + StreamObserver responseObserver) { +try { + LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request); + // insert preparation + String preparationId = + String.format("%s_%d", request.getJobName(), ThreadLocalRandom.current().nextInt()); + GrpcFnServer stagingService = + artifactStagingServiceProvider.forJob(preparationId); + Struct pipelineOptions = request.getPipelineOptions(); + if (pipelineOptions == null) { +LOG.trace("PIPELINE OPTIONS IS NULL"); +throw new NullPointerException("Encountered null pipeline options."); +/* +LOG.debug("Encountered null pipeline options. Using default."); +pipelineOptions = Struct.getDefaultInstance(); +*/ + } else { +LOG.trace("PIPELINE OPTIONS IS NOT NULL"); + } + LOG.trace("PIPELINE OPTIONS {} {}", pipelineOptions.getClass(), pipelineOptions); + JobPreparation preparation = + JobPreparation + .builder() + .setId(preparationId) + .setPipeline(request.getPipeline()) + .setOptions(pipelineOptions) + .setStagingService(stagingService) + .build(); + JobPreparation previous = preparations.putIfAbsent(preparationId, preparation); + if (previous != null) { +String errMessage
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100333=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100333 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 09/May/18 23:43 Start Date: 09/May/18 23:43 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5262: [BEAM-2588] WIP Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r187206333 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java ## @@ -0,0 +1,309 @@ +package org.apache.beam.runners.fnexecution.jobsubmission; + + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Struct; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse; +import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc; +import org.apache.beam.runners.fnexecution.FnService; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService; +import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A JobService that prepares and runs jobs on behalf of a client using a {@link JobInvoker}. + */ +public class JobService extends JobServiceGrpc.JobServiceImplBase implements FnService { + private static final Logger LOG = LoggerFactory.getLogger(JobService.class); + + public static JobService create( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +return new JobService(artifactStagingServiceProvider, invoker); + } + + private final ConcurrentMappreparations; + private final ConcurrentMap invocations; + private final ArtifactStagingServiceProvider artifactStagingServiceProvider; + private final JobInvoker invoker; + + private JobService( + ArtifactStagingServiceProvider artifactStagingServiceProvider, JobInvoker invoker) { +this.artifactStagingServiceProvider = artifactStagingServiceProvider; +this.invoker = invoker; + +this.preparations = new ConcurrentHashMap<>(); +this.invocations = new ConcurrentHashMap<>(); + } + + @Override + public void prepare( + PrepareJobRequest request, + StreamObserver responseObserver) { +try { + LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request); + // insert preparation + String preparationId = + String.format("%s_%d", request.getJobName(), ThreadLocalRandom.current().nextInt()); + GrpcFnServer stagingService = + artifactStagingServiceProvider.forJob(preparationId); + Struct pipelineOptions = request.getPipelineOptions(); + if (pipelineOptions == null) { +LOG.trace("PIPELINE OPTIONS IS NULL"); +throw new NullPointerException("Encountered null pipeline options."); +/* +LOG.debug("Encountered null pipeline options. Using default."); +pipelineOptions = Struct.getDefaultInstance(); +*/ + } else { +LOG.trace("PIPELINE OPTIONS IS NOT NULL"); + } + LOG.trace("PIPELINE OPTIONS {} {}", pipelineOptions.getClass(), pipelineOptions); + JobPreparation preparation = + JobPreparation + .builder() + .setId(preparationId) + .setPipeline(request.getPipeline()) + .setOptions(pipelineOptions) + .setStagingService(stagingService) + .build(); + JobPreparation previous = preparations.putIfAbsent(preparationId, preparation); + if (previous != null) { +String errMessage
[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API
[ https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100331=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100331 ] ASF GitHub Bot logged work on BEAM-2588: Author: ASF GitHub Bot Created on: 09/May/18 23:38 Start Date: 09/May/18 23:38 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5262: [BEAM-2588] WIP Portability Runner Job Service URL: https://github.com/apache/beam/pull/5262#discussion_r187205740 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java ## @@ -0,0 +1,41 @@ +package org.apache.beam.runners.fnexecution.jobsubmission; + +import io.grpc.stub.StreamObserver; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; + +/** + * Internal representation of a Job which has been invoked (prepared and run) by a client. + */ +public interface JobInvocation { + + /** + * Start the job. + */ + void start(); + + /** + * @return Unique identifier for the job invocation. + */ + String getId(); + + /** + * Cancel the job. + */ + void cancel(); + + /** + * Retrieve the job's current state. + */ + JobState.Enum getState(); + + /** + * Observe job state changes with a {@link StreamObserver}. + */ + void addStateObserver(StreamObserver stateStreamObserver); + + /** + * Observe job messages with a {@link StreamObserver}. + */ + void addMessageObserver(StreamObserver messageStreamObserver); Review comment: Fair enough. I'll just use a Consumer interface rather than defining something new then. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 100331) Time Spent: 10m Remaining Estimate: 0h > Portable Flink Runner Job API > - > > Key: BEAM-2588 > URL: https://issues.apache.org/jira/browse/BEAM-2588 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 10m > Remaining Estimate: 0h > > The portable Flink runner needs to be wired into a job server so that it can > accept jobs the job api (https://s.apache.org/beam-job-api). -- This message was sent by Atlassian JIRA (v7.6.3#76005)