[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=103167=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103167
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 17/May/18 23:23
Start Date: 17/May/18 23:23
Worklog Time Spent: 10m 
  Work Description: jkff closed pull request #5262: [BEAM-2588] Portability 
Runner Job Service
URL: https://github.com/apache/beam/pull/5262
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
new file mode 100644
index 000..fde34826e14
--- /dev/null
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.jobsubmission;
+
+import com.google.protobuf.Struct;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A InMemoryJobService that prepares and runs jobs on behalf of a client 
using a
+ * {@link JobInvoker}.
+ *
+ * Job management is handled in-memory rather than any persistent storage, 
running the risk of
+ * leaking jobs if the InMemoryJobService crashes.
+ *
+ * TODO: replace in-memory job management state with persistent solution.
+ */
+public class InMemoryJobService extends JobServiceGrpc.JobServiceImplBase 
implements FnService {
+  private static final Logger LOG = 
LoggerFactory.getLogger(InMemoryJobService.class);
+
+  public static InMemoryJobService create(
+  Endpoints.ApiServiceDescriptor stagingServiceDescriptor, JobInvoker 
invoker) {
+return new InMemoryJobService(stagingServiceDescriptor, invoker);
+  }
+
+  private final ConcurrentMap preparations;
+  private final ConcurrentMap invocations;
+  private final Endpoints.ApiServiceDescriptor stagingServiceDescriptor;
+  private final JobInvoker invoker;
+
+  private InMemoryJobService(
+  Endpoints.ApiServiceDescriptor stagingServiceDescriptor, JobInvoker 
invoker) {
+this.stagingServiceDescriptor = stagingServiceDescriptor;
+this.invoker = invoker;
+
+this.preparations = new ConcurrentHashMap<>();
+this.invocations = new ConcurrentHashMap<>();
+  }
+
+  

[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=103165=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103165
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 17/May/18 23:21
Start Date: 17/May/18 23:21
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5262: 
[BEAM-2588] Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#discussion_r189128166
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
 ##
 @@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.jobsubmission;
+
+import com.google.protobuf.Struct;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A InMemoryJobService that prepares and runs jobs on behalf of a client 
using a
+ * {@link JobInvoker}.
+ *
+ * Job management is handled in-memory rather than any persistent storage, 
running the risk of
+ * leaking jobs if the InMemoryJobService crashes.
+ *
+ * TODO: replace in-memory job management state with persistent solution.
+ */
+public class InMemoryJobService extends JobServiceGrpc.JobServiceImplBase 
implements FnService {
+  private static final Logger LOG = 
LoggerFactory.getLogger(InMemoryJobService.class);
+
+  public static InMemoryJobService create(
+  Endpoints.ApiServiceDescriptor stagingServiceDescriptor, JobInvoker 
invoker) {
+return new InMemoryJobService(stagingServiceDescriptor, invoker);
+  }
+
+  private final ConcurrentMap preparations;
+  private final ConcurrentMap invocations;
+  private final Endpoints.ApiServiceDescriptor stagingServiceDescriptor;
+  private final JobInvoker invoker;
+
+  private InMemoryJobService(
+  Endpoints.ApiServiceDescriptor stagingServiceDescriptor, JobInvoker 
invoker) {
+this.stagingServiceDescriptor = stagingServiceDescriptor;
+this.invoker = invoker;
+
+this.preparations = new ConcurrentHashMap<>();
+this.invocations = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void prepare(
+  PrepareJobRequest request,
+  StreamObserver responseObserver) {
+try {
+  LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request);
+  // insert preparation
+  String preparationId =
+  String.format("%s_%s", request.getJobName(), 
UUID.randomUUID().toString());
+  Struct pipelineOptions = request.getPipelineOptions();
+  if (pipelineOptions == null) {
+throw new NullPointerException("Encountered null pipeline 

[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=103164=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103164
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 17/May/18 23:14
Start Date: 17/May/18 23:14
Worklog Time Spent: 10m 
  Work Description: axelmagn commented on issue #5262: [BEAM-2588] 
Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#issuecomment-390042225
 
 
   @jkff: This is ready for review, and no longer a work in progress.  Please 
take a look.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 103164)
Time Spent: 5h 40m  (was: 5.5h)

> Portable Flink Runner Job API
> -
>
> Key: BEAM-2588
> URL: https://issues.apache.org/jira/browse/BEAM-2588
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Axel Magnuson
>Priority: Major
>  Labels: portability
>  Time Spent: 5h 40m
>  Remaining Estimate: 0h
>
> The portable Flink runner needs to be wired into a job server so that it can 
> accept jobs the job api (https://s.apache.org/beam-job-api).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-14 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=101921=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101921
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 14/May/18 22:51
Start Date: 14/May/18 22:51
Worklog Time Spent: 10m 
  Work Description: axelmagn commented on a change in pull request #5262: 
[BEAM-2588] WIP Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#discussion_r188121725
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java
 ##
 @@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.jobsubmission;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.Struct;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
+import 
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider;
+import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A JobService that prepares and runs jobs on behalf of a client using a 
{@link JobInvoker}.
+ *
+ * Job management is handled in-memory rather than any persistent storage, 
running the risk of
+ * leaking jobs if the JobService crashes.
+ *
+ * TODO: replace in-memory job management state with persistent solution.
+ */
+public class JobService extends JobServiceGrpc.JobServiceImplBase implements 
FnService {
+  private static final Logger LOG = LoggerFactory.getLogger(JobService.class);
+
+  public static JobService create(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+return new JobService(artifactStagingServiceProvider, invoker);
+  }
+
+  private final ConcurrentMap preparations;
+  private final ConcurrentMap invocations;
+  private final ArtifactStagingServiceProvider artifactStagingServiceProvider;
+  private final JobInvoker invoker;
+
+  private JobService(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+this.artifactStagingServiceProvider = artifactStagingServiceProvider;
+this.invoker = invoker;
+
+this.preparations = new ConcurrentHashMap<>();
+this.invocations = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void prepare(
+  PrepareJobRequest request,
+  StreamObserver responseObserver) {
+try {
+  LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request);
+  // insert preparation
+  String 

[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-11 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=101349=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101349
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 11/May/18 23:29
Start Date: 11/May/18 23:29
Worklog Time Spent: 10m 
  Work Description: axelmagn commented on a change in pull request #5262: 
[BEAM-2588] WIP Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#discussion_r187753840
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobPreparation.java
 ##
 @@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.jobsubmission;
+
+import com.google.auto.value.AutoValue;
+import com.google.protobuf.Struct;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
+
+/** A job that has been prepared, but not invoked. */
+@AutoValue
+public abstract class JobPreparation {
+  public static Builder builder() {
+return new AutoValue_JobPreparation.Builder();
+  }
+
+  public abstract String id();
+  public abstract Pipeline pipeline();
+  public abstract Struct options();
+  public abstract GrpcFnServer stagingService();
+
+  @AutoValue.Builder
+  abstract static class Builder {
+abstract Builder setId(String id);
+abstract Builder setPipeline(Pipeline pipeline);
+abstract Builder setOptions(Struct options);
+abstract Builder setStagingService(GrpcFnServer 
stagingService);
 
 Review comment:
   key decision: the artifact staging token is the suitable item to return 
here.  Because of the expected tight coupling between JobService and 
ArtifactStagingService, a staging token should be enough information for a 
runner or worker to construct an ArtifactRetrievalService or ArtifactSource on 
demand.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 101349)
Time Spent: 5h 20m  (was: 5h 10m)

> Portable Flink Runner Job API
> -
>
> Key: BEAM-2588
> URL: https://issues.apache.org/jira/browse/BEAM-2588
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Axel Magnuson
>Priority: Major
>  Labels: portability
>  Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> The portable Flink runner needs to be wired into a job server so that it can 
> accept jobs the job api (https://s.apache.org/beam-job-api).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-11 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=101343=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101343
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 11/May/18 23:00
Start Date: 11/May/18 23:00
Worklog Time Spent: 10m 
  Work Description: axelmagn commented on a change in pull request #5262: 
[BEAM-2588] WIP Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#discussion_r187750857
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java
 ##
 @@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.jobsubmission;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.Struct;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
+import 
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider;
+import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A JobService that prepares and runs jobs on behalf of a client using a 
{@link JobInvoker}.
+ *
+ * Job management is handled in-memory rather than any persistent storage, 
running the risk of
+ * leaking jobs if the JobService crashes.
+ *
+ * TODO: replace in-memory job management state with persistent solution.
+ */
+public class JobService extends JobServiceGrpc.JobServiceImplBase implements 
FnService {
+  private static final Logger LOG = LoggerFactory.getLogger(JobService.class);
+
+  public static JobService create(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+return new JobService(artifactStagingServiceProvider, invoker);
+  }
+
+  private final ConcurrentMap preparations;
+  private final ConcurrentMap invocations;
+  private final ArtifactStagingServiceProvider artifactStagingServiceProvider;
+  private final JobInvoker invoker;
+
+  private JobService(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+this.artifactStagingServiceProvider = artifactStagingServiceProvider;
+this.invoker = invoker;
+
+this.preparations = new ConcurrentHashMap<>();
+this.invocations = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void prepare(
+  PrepareJobRequest request,
+  StreamObserver responseObserver) {
+try {
+  LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request);
+  // insert preparation
+  String 

[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-11 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=101341=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101341
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 11/May/18 22:49
Start Date: 11/May/18 22:49
Worklog Time Spent: 10m 
  Work Description: axelmagn commented on a change in pull request #5262: 
[BEAM-2588] WIP Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#discussion_r187749655
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService.java
 ##
 @@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution.artifact;
+
+import org.apache.beam.runners.fnexecution.FnService;
+
+/** An implementation of the Beam Artifact Staging Service. */
+public interface ArtifactStagingService extends FnService {
 
 Review comment:
   TBC offline


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 101341)
Time Spent: 5h  (was: 4h 50m)

> Portable Flink Runner Job API
> -
>
> Key: BEAM-2588
> URL: https://issues.apache.org/jira/browse/BEAM-2588
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Axel Magnuson
>Priority: Major
>  Labels: portability
>  Time Spent: 5h
>  Remaining Estimate: 0h
>
> The portable Flink runner needs to be wired into a job server so that it can 
> accept jobs the job api (https://s.apache.org/beam-job-api).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-11 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=101339=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101339
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 11/May/18 22:48
Start Date: 11/May/18 22:48
Worklog Time Spent: 10m 
  Work Description: axelmagn commented on a change in pull request #5262: 
[BEAM-2588] WIP Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#discussion_r187749562
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/package-info.java
 ##
 @@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal job management service implementation of the Beam runner for 
Apache Flink.
 
 Review comment:
   whoops. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 101339)
Time Spent: 4h 40m  (was: 4.5h)

> Portable Flink Runner Job API
> -
>
> Key: BEAM-2588
> URL: https://issues.apache.org/jira/browse/BEAM-2588
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Axel Magnuson
>Priority: Major
>  Labels: portability
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> The portable Flink runner needs to be wired into a job server so that it can 
> accept jobs the job api (https://s.apache.org/beam-job-api).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-11 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=101338=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101338
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 11/May/18 22:48
Start Date: 11/May/18 22:48
Worklog Time Spent: 10m 
  Work Description: axelmagn commented on a change in pull request #5262: 
[BEAM-2588] WIP Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#discussion_r187749514
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java
 ##
 @@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.jobsubmission;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.Struct;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
+import 
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider;
+import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A JobService that prepares and runs jobs on behalf of a client using a 
{@link JobInvoker}.
+ *
+ * Job management is handled in-memory rather than any persistent storage, 
running the risk of
+ * leaking jobs if the JobService crashes.
+ *
+ * TODO: replace in-memory job management state with persistent solution.
+ */
+public class JobService extends JobServiceGrpc.JobServiceImplBase implements 
FnService {
+  private static final Logger LOG = LoggerFactory.getLogger(JobService.class);
+
+  public static JobService create(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+return new JobService(artifactStagingServiceProvider, invoker);
+  }
+
+  private final ConcurrentMap preparations;
+  private final ConcurrentMap invocations;
+  private final ArtifactStagingServiceProvider artifactStagingServiceProvider;
+  private final JobInvoker invoker;
+
+  private JobService(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+this.artifactStagingServiceProvider = artifactStagingServiceProvider;
+this.invoker = invoker;
+
+this.preparations = new ConcurrentHashMap<>();
+this.invocations = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void prepare(
+  PrepareJobRequest request,
+  StreamObserver responseObserver) {
+try {
+  LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request);
+  // insert preparation
+  String 

[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-11 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=101340=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101340
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 11/May/18 22:49
Start Date: 11/May/18 22:49
Worklog Time Spent: 10m 
  Work Description: axelmagn commented on a change in pull request #5262: 
[BEAM-2588] WIP Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#discussion_r187749607
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java
 ##
 @@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.jobsubmission;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.Struct;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
+import 
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider;
+import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A JobService that prepares and runs jobs on behalf of a client using a 
{@link JobInvoker}.
+ *
+ * Job management is handled in-memory rather than any persistent storage, 
running the risk of
+ * leaking jobs if the JobService crashes.
+ *
+ * TODO: replace in-memory job management state with persistent solution.
+ */
+public class JobService extends JobServiceGrpc.JobServiceImplBase implements 
FnService {
 
 Review comment:
   SGTM


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 101340)
Time Spent: 4h 50m  (was: 4h 40m)

> Portable Flink Runner Job API
> -
>
> Key: BEAM-2588
> URL: https://issues.apache.org/jira/browse/BEAM-2588
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Axel Magnuson
>Priority: Major
>  Labels: portability
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> The portable Flink runner needs to be wired into a job server so that it can 
> accept jobs the job api (https://s.apache.org/beam-job-api).



--
This message was sent by Atlassian JIRA

[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-11 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=101337=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101337
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 11/May/18 22:48
Start Date: 11/May/18 22:48
Worklog Time Spent: 10m 
  Work Description: axelmagn commented on a change in pull request #5262: 
[BEAM-2588] WIP Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#discussion_r187749492
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java
 ##
 @@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.jobsubmission;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.Struct;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
+import 
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider;
+import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A JobService that prepares and runs jobs on behalf of a client using a 
{@link JobInvoker}.
+ *
+ * Job management is handled in-memory rather than any persistent storage, 
running the risk of
+ * leaking jobs if the JobService crashes.
+ *
+ * TODO: replace in-memory job management state with persistent solution.
+ */
+public class JobService extends JobServiceGrpc.JobServiceImplBase implements 
FnService {
+  private static final Logger LOG = LoggerFactory.getLogger(JobService.class);
+
+  public static JobService create(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+return new JobService(artifactStagingServiceProvider, invoker);
+  }
+
+  private final ConcurrentMap preparations;
+  private final ConcurrentMap invocations;
+  private final ArtifactStagingServiceProvider artifactStagingServiceProvider;
+  private final JobInvoker invoker;
+
+  private JobService(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+this.artifactStagingServiceProvider = artifactStagingServiceProvider;
+this.invoker = invoker;
+
+this.preparations = new ConcurrentHashMap<>();
+this.invocations = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void prepare(
+  PrepareJobRequest request,
+  StreamObserver responseObserver) {
+try {
+  LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request);
+  // insert preparation
+  String 

[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-11 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=101330=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101330
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 11/May/18 22:30
Start Date: 11/May/18 22:30
Worklog Time Spent: 10m 
  Work Description: axelmagn commented on a change in pull request #5262: 
[BEAM-2588] WIP Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#discussion_r187747317
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobPreparation.java
 ##
 @@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.jobsubmission;
+
+import com.google.auto.value.AutoValue;
+import com.google.protobuf.Struct;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
+
+/** A job that has been prepared, but not invoked. */
+@AutoValue
+public abstract class JobPreparation {
+  public static Builder builder() {
+return new AutoValue_JobPreparation.Builder();
+  }
+
+  public abstract String id();
+  public abstract Pipeline pipeline();
+  public abstract Struct options();
+  public abstract GrpcFnServer stagingService();
+
+  @AutoValue.Builder
+  abstract static class Builder {
+abstract Builder setId(String id);
+abstract Builder setPipeline(Pipeline pipeline);
+abstract Builder setOptions(Struct options);
+abstract Builder setStagingService(GrpcFnServer 
stagingService);
 
 Review comment:
   Yeah, ArtifactStagingService is almost certainly the wrong attribute type.  
Really this should be the generic artifact reference that we discussed offline. 
 Once we iron out what that should look like, I'll come back and amend this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 101330)
Time Spent: 4h 10m  (was: 4h)

> Portable Flink Runner Job API
> -
>
> Key: BEAM-2588
> URL: https://issues.apache.org/jira/browse/BEAM-2588
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Axel Magnuson
>Priority: Major
>  Labels: portability
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> The portable Flink runner needs to be wired into a job server so that it can 
> accept jobs the job api (https://s.apache.org/beam-job-api).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-11 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=101328=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101328
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 11/May/18 22:28
Start Date: 11/May/18 22:28
Worklog Time Spent: 10m 
  Work Description: axelmagn commented on a change in pull request #5262: 
[BEAM-2588] WIP Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#discussion_r187747109
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java
 ##
 @@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.jobsubmission;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.Struct;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
+import 
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider;
+import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A JobService that prepares and runs jobs on behalf of a client using a 
{@link JobInvoker}.
+ *
+ * Job management is handled in-memory rather than any persistent storage, 
running the risk of
+ * leaking jobs if the JobService crashes.
+ *
+ * TODO: replace in-memory job management state with persistent solution.
+ */
+public class JobService extends JobServiceGrpc.JobServiceImplBase implements 
FnService {
+  private static final Logger LOG = LoggerFactory.getLogger(JobService.class);
+
+  public static JobService create(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+return new JobService(artifactStagingServiceProvider, invoker);
+  }
+
+  private final ConcurrentMap preparations;
+  private final ConcurrentMap invocations;
+  private final ArtifactStagingServiceProvider artifactStagingServiceProvider;
+  private final JobInvoker invoker;
+
+  private JobService(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+this.artifactStagingServiceProvider = artifactStagingServiceProvider;
+this.invoker = invoker;
+
+this.preparations = new ConcurrentHashMap<>();
+this.invocations = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void prepare(
+  PrepareJobRequest request,
+  StreamObserver responseObserver) {
+try {
+  LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request);
+  // insert preparation
+  String 

[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-11 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=101235=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101235
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 11/May/18 17:56
Start Date: 11/May/18 17:56
Worklog Time Spent: 10m 
  Work Description: axelmagn commented on a change in pull request #5262: 
[BEAM-2588] WIP Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#discussion_r187687909
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService.java
 ##
 @@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution.artifact;
+
+import org.apache.beam.runners.fnexecution.FnService;
+
+/** An implementation of the Beam Artifact Staging Service. */
+public interface ArtifactStagingService extends FnService {
 
 Review comment:
   Really they are used by 3 clients in 3 completely different ways (the sdk 
client, the invoker, and the sdk harness), but I see your point.  I'm happy to 
remove it from the current PR and have a design review of it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 101235)
Time Spent: 3h 50m  (was: 3h 40m)

> Portable Flink Runner Job API
> -
>
> Key: BEAM-2588
> URL: https://issues.apache.org/jira/browse/BEAM-2588
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Axel Magnuson
>Priority: Major
>  Labels: portability
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> The portable Flink runner needs to be wired into a job server so that it can 
> accept jobs the job api (https://s.apache.org/beam-job-api).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-10 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100940=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100940
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 11/May/18 01:15
Start Date: 11/May/18 01:15
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5262: 
[BEAM-2588] WIP Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#discussion_r187499486
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java
 ##
 @@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.jobsubmission;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.Struct;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
+import 
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider;
+import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A JobService that prepares and runs jobs on behalf of a client using a 
{@link JobInvoker}.
+ *
+ * Job management is handled in-memory rather than any persistent storage, 
running the risk of
+ * leaking jobs if the JobService crashes.
+ *
+ * TODO: replace in-memory job management state with persistent solution.
+ */
+public class JobService extends JobServiceGrpc.JobServiceImplBase implements 
FnService {
+  private static final Logger LOG = LoggerFactory.getLogger(JobService.class);
+
+  public static JobService create(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+return new JobService(artifactStagingServiceProvider, invoker);
+  }
+
+  private final ConcurrentMap preparations;
+  private final ConcurrentMap invocations;
+  private final ArtifactStagingServiceProvider artifactStagingServiceProvider;
+  private final JobInvoker invoker;
+
+  private JobService(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+this.artifactStagingServiceProvider = artifactStagingServiceProvider;
+this.invoker = invoker;
+
+this.preparations = new ConcurrentHashMap<>();
+this.invocations = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void prepare(
+  PrepareJobRequest request,
+  StreamObserver responseObserver) {
+try {
+  LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request);
+  // insert preparation
+  String 

[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-10 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100952=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100952
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 11/May/18 01:15
Start Date: 11/May/18 01:15
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5262: 
[BEAM-2588] WIP Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#discussion_r187499599
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java
 ##
 @@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.jobsubmission;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.Struct;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
+import 
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider;
+import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A JobService that prepares and runs jobs on behalf of a client using a 
{@link JobInvoker}.
+ *
+ * Job management is handled in-memory rather than any persistent storage, 
running the risk of
+ * leaking jobs if the JobService crashes.
+ *
+ * TODO: replace in-memory job management state with persistent solution.
+ */
+public class JobService extends JobServiceGrpc.JobServiceImplBase implements 
FnService {
 
 Review comment:
   Maybe rename this to InMemoryJobService then?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 100952)
Time Spent: 3h 40m  (was: 3.5h)

> Portable Flink Runner Job API
> -
>
> Key: BEAM-2588
> URL: https://issues.apache.org/jira/browse/BEAM-2588
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Axel Magnuson
>Priority: Major
>  Labels: portability
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> The portable Flink runner needs to be wired into a job server so that it can 
> accept jobs the job api (https://s.apache.org/beam-job-api).



--
This 

[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-10 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100945=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100945
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 11/May/18 01:15
Start Date: 11/May/18 01:15
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5262: 
[BEAM-2588] WIP Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#discussion_r187498839
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java
 ##
 @@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.jobsubmission;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.Struct;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
+import 
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider;
+import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A JobService that prepares and runs jobs on behalf of a client using a 
{@link JobInvoker}.
+ *
+ * Job management is handled in-memory rather than any persistent storage, 
running the risk of
+ * leaking jobs if the JobService crashes.
+ *
+ * TODO: replace in-memory job management state with persistent solution.
+ */
+public class JobService extends JobServiceGrpc.JobServiceImplBase implements 
FnService {
+  private static final Logger LOG = LoggerFactory.getLogger(JobService.class);
+
+  public static JobService create(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+return new JobService(artifactStagingServiceProvider, invoker);
+  }
+
+  private final ConcurrentMap preparations;
+  private final ConcurrentMap invocations;
+  private final ArtifactStagingServiceProvider artifactStagingServiceProvider;
+  private final JobInvoker invoker;
+
+  private JobService(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+this.artifactStagingServiceProvider = artifactStagingServiceProvider;
+this.invoker = invoker;
+
+this.preparations = new ConcurrentHashMap<>();
+this.invocations = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void prepare(
+  PrepareJobRequest request,
+  StreamObserver responseObserver) {
+try {
+  LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request);
+  // insert preparation
+  String 

[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-10 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100949=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100949
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 11/May/18 01:15
Start Date: 11/May/18 01:15
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5262: 
[BEAM-2588] WIP Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#discussion_r187499130
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java
 ##
 @@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.jobsubmission;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.Struct;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
+import 
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider;
+import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A JobService that prepares and runs jobs on behalf of a client using a 
{@link JobInvoker}.
+ *
+ * Job management is handled in-memory rather than any persistent storage, 
running the risk of
+ * leaking jobs if the JobService crashes.
+ *
+ * TODO: replace in-memory job management state with persistent solution.
+ */
+public class JobService extends JobServiceGrpc.JobServiceImplBase implements 
FnService {
+  private static final Logger LOG = LoggerFactory.getLogger(JobService.class);
+
+  public static JobService create(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+return new JobService(artifactStagingServiceProvider, invoker);
+  }
+
+  private final ConcurrentMap preparations;
+  private final ConcurrentMap invocations;
+  private final ArtifactStagingServiceProvider artifactStagingServiceProvider;
+  private final JobInvoker invoker;
+
+  private JobService(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+this.artifactStagingServiceProvider = artifactStagingServiceProvider;
+this.invoker = invoker;
+
+this.preparations = new ConcurrentHashMap<>();
+this.invocations = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void prepare(
+  PrepareJobRequest request,
+  StreamObserver responseObserver) {
+try {
+  LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request);
+  // insert preparation
+  String 

[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-10 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100951=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100951
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 11/May/18 01:15
Start Date: 11/May/18 01:15
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5262: 
[BEAM-2588] WIP Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#discussion_r187499520
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java
 ##
 @@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.jobsubmission;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.Struct;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
+import 
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider;
+import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A JobService that prepares and runs jobs on behalf of a client using a 
{@link JobInvoker}.
+ *
+ * Job management is handled in-memory rather than any persistent storage, 
running the risk of
+ * leaking jobs if the JobService crashes.
+ *
+ * TODO: replace in-memory job management state with persistent solution.
+ */
+public class JobService extends JobServiceGrpc.JobServiceImplBase implements 
FnService {
+  private static final Logger LOG = LoggerFactory.getLogger(JobService.class);
+
+  public static JobService create(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+return new JobService(artifactStagingServiceProvider, invoker);
+  }
+
+  private final ConcurrentMap preparations;
+  private final ConcurrentMap invocations;
+  private final ArtifactStagingServiceProvider artifactStagingServiceProvider;
+  private final JobInvoker invoker;
+
+  private JobService(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+this.artifactStagingServiceProvider = artifactStagingServiceProvider;
+this.invoker = invoker;
+
+this.preparations = new ConcurrentHashMap<>();
+this.invocations = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void prepare(
+  PrepareJobRequest request,
+  StreamObserver responseObserver) {
+try {
+  LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request);
+  // insert preparation
+  String 

[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-10 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100946=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100946
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 11/May/18 01:15
Start Date: 11/May/18 01:15
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5262: 
[BEAM-2588] WIP Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#discussion_r187498589
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobPreparation.java
 ##
 @@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.jobsubmission;
+
+import com.google.auto.value.AutoValue;
+import com.google.protobuf.Struct;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
+
+/** A job that has been prepared, but not invoked. */
+@AutoValue
+public abstract class JobPreparation {
+  public static Builder builder() {
+return new AutoValue_JobPreparation.Builder();
+  }
+
+  public abstract String id();
+  public abstract Pipeline pipeline();
+  public abstract Struct options();
+  public abstract GrpcFnServer stagingService();
+
+  @AutoValue.Builder
+  abstract static class Builder {
+abstract Builder setId(String id);
+abstract Builder setPipeline(Pipeline pipeline);
+abstract Builder setOptions(Struct options);
+abstract Builder setStagingService(GrpcFnServer 
stagingService);
 
 Review comment:
   Why is an ArtifactStagingService server part of a JobPreparation? Seems like 
the server should just live together with the JobService, no?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 100946)
Time Spent: 3h  (was: 2h 50m)

> Portable Flink Runner Job API
> -
>
> Key: BEAM-2588
> URL: https://issues.apache.org/jira/browse/BEAM-2588
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Axel Magnuson
>Priority: Major
>  Labels: portability
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> The portable Flink runner needs to be wired into a job server so that it can 
> accept jobs the job api (https://s.apache.org/beam-job-api).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-10 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100944=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100944
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 11/May/18 01:15
Start Date: 11/May/18 01:15
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5262: 
[BEAM-2588] WIP Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#discussion_r187499551
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/package-info.java
 ##
 @@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal job management service implementation of the Beam runner for 
Apache Flink.
 
 Review comment:
   This is not a Flink-specific package though.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 100944)
Time Spent: 2h 40m  (was: 2.5h)

> Portable Flink Runner Job API
> -
>
> Key: BEAM-2588
> URL: https://issues.apache.org/jira/browse/BEAM-2588
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Axel Magnuson
>Priority: Major
>  Labels: portability
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> The portable Flink runner needs to be wired into a job server so that it can 
> accept jobs the job api (https://s.apache.org/beam-job-api).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-10 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100948=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100948
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 11/May/18 01:15
Start Date: 11/May/18 01:15
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5262: 
[BEAM-2588] WIP Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#discussion_r187499000
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java
 ##
 @@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.jobsubmission;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.Struct;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
+import 
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider;
+import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A JobService that prepares and runs jobs on behalf of a client using a 
{@link JobInvoker}.
+ *
+ * Job management is handled in-memory rather than any persistent storage, 
running the risk of
+ * leaking jobs if the JobService crashes.
+ *
+ * TODO: replace in-memory job management state with persistent solution.
+ */
+public class JobService extends JobServiceGrpc.JobServiceImplBase implements 
FnService {
+  private static final Logger LOG = LoggerFactory.getLogger(JobService.class);
+
+  public static JobService create(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+return new JobService(artifactStagingServiceProvider, invoker);
+  }
+
+  private final ConcurrentMap preparations;
+  private final ConcurrentMap invocations;
+  private final ArtifactStagingServiceProvider artifactStagingServiceProvider;
+  private final JobInvoker invoker;
+
+  private JobService(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+this.artifactStagingServiceProvider = artifactStagingServiceProvider;
+this.invoker = invoker;
+
+this.preparations = new ConcurrentHashMap<>();
+this.invocations = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void prepare(
+  PrepareJobRequest request,
+  StreamObserver responseObserver) {
+try {
+  LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request);
+  // insert preparation
+  String 

[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-10 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100942=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100942
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 11/May/18 01:15
Start Date: 11/May/18 01:15
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5262: 
[BEAM-2588] WIP Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#discussion_r187498959
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java
 ##
 @@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.jobsubmission;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.Struct;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
+import 
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider;
+import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A JobService that prepares and runs jobs on behalf of a client using a 
{@link JobInvoker}.
+ *
+ * Job management is handled in-memory rather than any persistent storage, 
running the risk of
+ * leaking jobs if the JobService crashes.
+ *
+ * TODO: replace in-memory job management state with persistent solution.
+ */
+public class JobService extends JobServiceGrpc.JobServiceImplBase implements 
FnService {
+  private static final Logger LOG = LoggerFactory.getLogger(JobService.class);
+
+  public static JobService create(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+return new JobService(artifactStagingServiceProvider, invoker);
+  }
+
+  private final ConcurrentMap preparations;
+  private final ConcurrentMap invocations;
+  private final ArtifactStagingServiceProvider artifactStagingServiceProvider;
+  private final JobInvoker invoker;
+
+  private JobService(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+this.artifactStagingServiceProvider = artifactStagingServiceProvider;
+this.invoker = invoker;
+
+this.preparations = new ConcurrentHashMap<>();
+this.invocations = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void prepare(
+  PrepareJobRequest request,
+  StreamObserver responseObserver) {
+try {
+  LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request);
+  // insert preparation
+  String 

[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-10 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100941=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100941
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 11/May/18 01:15
Start Date: 11/May/18 01:15
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5262: 
[BEAM-2588] WIP Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#discussion_r187499937
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService.java
 ##
 @@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution.artifact;
+
+import org.apache.beam.runners.fnexecution.FnService;
+
+/** An implementation of the Beam Artifact Staging Service. */
+public interface ArtifactStagingService extends FnService {
 
 Review comment:
   I can't say I'm quite convinced yet. Artifacts are used by 2 kinds of 
clients in 2 completely different ways - nobody needs both interfaces, so 
there's no need for a combined interface, and its existence may interfere with 
future refactoring. Moreover, in this PR, we never call createAccessor(), so I 
would suggest to remove this class from the current PR and defer the issue 
until the one where it is used (possibly after an offline discussion).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 100941)
Time Spent: 2h 10m  (was: 2h)

> Portable Flink Runner Job API
> -
>
> Key: BEAM-2588
> URL: https://issues.apache.org/jira/browse/BEAM-2588
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Axel Magnuson
>Priority: Major
>  Labels: portability
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> The portable Flink runner needs to be wired into a job server so that it can 
> accept jobs the job api (https://s.apache.org/beam-job-api).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-10 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100947=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100947
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 11/May/18 01:15
Start Date: 11/May/18 01:15
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5262: 
[BEAM-2588] WIP Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#discussion_r187498931
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java
 ##
 @@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.jobsubmission;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.Struct;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
+import 
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider;
+import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A JobService that prepares and runs jobs on behalf of a client using a 
{@link JobInvoker}.
+ *
+ * Job management is handled in-memory rather than any persistent storage, 
running the risk of
+ * leaking jobs if the JobService crashes.
+ *
+ * TODO: replace in-memory job management state with persistent solution.
+ */
+public class JobService extends JobServiceGrpc.JobServiceImplBase implements 
FnService {
+  private static final Logger LOG = LoggerFactory.getLogger(JobService.class);
+
+  public static JobService create(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+return new JobService(artifactStagingServiceProvider, invoker);
+  }
+
+  private final ConcurrentMap preparations;
+  private final ConcurrentMap invocations;
+  private final ArtifactStagingServiceProvider artifactStagingServiceProvider;
+  private final JobInvoker invoker;
+
+  private JobService(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+this.artifactStagingServiceProvider = artifactStagingServiceProvider;
+this.invoker = invoker;
+
+this.preparations = new ConcurrentHashMap<>();
+this.invocations = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void prepare(
+  PrepareJobRequest request,
+  StreamObserver responseObserver) {
+try {
+  LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request);
+  // insert preparation
+  String 

[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-10 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100943=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100943
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 11/May/18 01:15
Start Date: 11/May/18 01:15
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5262: 
[BEAM-2588] WIP Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#discussion_r187499371
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java
 ##
 @@ -0,0 +1,309 @@
+package org.apache.beam.runners.fnexecution.jobsubmission;
+
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.Struct;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
+import 
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A JobService that prepares and runs jobs on behalf of a client using a 
{@link JobInvoker}.
+ */
+public class JobService extends JobServiceGrpc.JobServiceImplBase implements 
FnService {
+  private static final Logger LOG = LoggerFactory.getLogger(JobService.class);
+
+  public static JobService create(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+return new JobService(artifactStagingServiceProvider, invoker);
+  }
+
+  private final ConcurrentMap preparations;
+  private final ConcurrentMap invocations;
+  private final ArtifactStagingServiceProvider artifactStagingServiceProvider;
+  private final JobInvoker invoker;
+
+  private JobService(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+this.artifactStagingServiceProvider = artifactStagingServiceProvider;
+this.invoker = invoker;
+
+this.preparations = new ConcurrentHashMap<>();
+this.invocations = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void prepare(
+  PrepareJobRequest request,
+  StreamObserver responseObserver) {
+try {
+  LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request);
+  // insert preparation
+  String preparationId =
+  String.format("%s_%d", request.getJobName(), 
ThreadLocalRandom.current().nextInt());
+  GrpcFnServer stagingService =
+  artifactStagingServiceProvider.forJob(preparationId);
+  Struct pipelineOptions = request.getPipelineOptions();
+  if (pipelineOptions == null) {
+LOG.trace("PIPELINE OPTIONS IS NULL");
+throw new NullPointerException("Encountered null pipeline options.");
+/*
+LOG.debug("Encountered null pipeline options.  Using default.");
+pipelineOptions = Struct.getDefaultInstance();
+*/
+  } else {
+LOG.trace("PIPELINE OPTIONS IS NOT NULL");
+  }
+  LOG.trace("PIPELINE OPTIONS {} {}", pipelineOptions.getClass(), 
pipelineOptions);
+  JobPreparation preparation =
+  JobPreparation
+  .builder()
+  .setId(preparationId)
+  .setPipeline(request.getPipeline())
+  .setOptions(pipelineOptions)
+  .setStagingService(stagingService)
+  .build();
+  JobPreparation previous = preparations.putIfAbsent(preparationId, 
preparation);
+  if (previous != null) {
+String errMessage =
+ 

[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-10 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100950=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100950
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 11/May/18 01:15
Start Date: 11/May/18 01:15
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5262: 
[BEAM-2588] WIP Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#discussion_r187498721
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java
 ##
 @@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.jobsubmission;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.Struct;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
+import 
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider;
+import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A JobService that prepares and runs jobs on behalf of a client using a 
{@link JobInvoker}.
+ *
+ * Job management is handled in-memory rather than any persistent storage, 
running the risk of
+ * leaking jobs if the JobService crashes.
+ *
+ * TODO: replace in-memory job management state with persistent solution.
+ */
+public class JobService extends JobServiceGrpc.JobServiceImplBase implements 
FnService {
+  private static final Logger LOG = LoggerFactory.getLogger(JobService.class);
+
+  public static JobService create(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+return new JobService(artifactStagingServiceProvider, invoker);
+  }
+
+  private final ConcurrentMap preparations;
+  private final ConcurrentMap invocations;
+  private final ArtifactStagingServiceProvider artifactStagingServiceProvider;
+  private final JobInvoker invoker;
+
+  private JobService(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+this.artifactStagingServiceProvider = artifactStagingServiceProvider;
+this.invoker = invoker;
+
+this.preparations = new ConcurrentHashMap<>();
+this.invocations = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void prepare(
+  PrepareJobRequest request,
+  StreamObserver responseObserver) {
+try {
+  LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request);
+  // insert preparation
+  String 

[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-10 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100936=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100936
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 11/May/18 01:02
Start Date: 11/May/18 01:02
Worklog Time Spent: 10m 
  Work Description: axelmagn commented on a change in pull request #5262: 
[BEAM-2588] WIP Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#discussion_r187498633
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java
 ##
 @@ -0,0 +1,309 @@
+package org.apache.beam.runners.fnexecution.jobsubmission;
+
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.Struct;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
+import 
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A JobService that prepares and runs jobs on behalf of a client using a 
{@link JobInvoker}.
+ */
+public class JobService extends JobServiceGrpc.JobServiceImplBase implements 
FnService {
+  private static final Logger LOG = LoggerFactory.getLogger(JobService.class);
+
+  public static JobService create(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+return new JobService(artifactStagingServiceProvider, invoker);
+  }
+
+  private final ConcurrentMap preparations;
+  private final ConcurrentMap invocations;
+  private final ArtifactStagingServiceProvider artifactStagingServiceProvider;
+  private final JobInvoker invoker;
+
+  private JobService(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+this.artifactStagingServiceProvider = artifactStagingServiceProvider;
+this.invoker = invoker;
+
+this.preparations = new ConcurrentHashMap<>();
+this.invocations = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void prepare(
+  PrepareJobRequest request,
+  StreamObserver responseObserver) {
+try {
+  LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request);
+  // insert preparation
+  String preparationId =
+  String.format("%s_%d", request.getJobName(), 
ThreadLocalRandom.current().nextInt());
+  GrpcFnServer stagingService =
+  artifactStagingServiceProvider.forJob(preparationId);
+  Struct pipelineOptions = request.getPipelineOptions();
+  if (pipelineOptions == null) {
+LOG.trace("PIPELINE OPTIONS IS NULL");
+throw new NullPointerException("Encountered null pipeline options.");
+/*
+LOG.debug("Encountered null pipeline options.  Using default.");
+pipelineOptions = Struct.getDefaultInstance();
+*/
+  } else {
+LOG.trace("PIPELINE OPTIONS IS NOT NULL");
+  }
+  LOG.trace("PIPELINE OPTIONS {} {}", pipelineOptions.getClass(), 
pipelineOptions);
+  JobPreparation preparation =
+  JobPreparation
+  .builder()
+  .setId(preparationId)
+  .setPipeline(request.getPipeline())
+  .setOptions(pipelineOptions)
+  .setStagingService(stagingService)
+  .build();
+  JobPreparation previous = preparations.putIfAbsent(preparationId, 
preparation);
 
 Review comment:
   fixed


[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-10 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100847=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100847
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 10/May/18 21:30
Start Date: 10/May/18 21:30
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5262: 
[BEAM-2588] WIP Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#discussion_r187465720
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java
 ##
 @@ -0,0 +1,309 @@
+package org.apache.beam.runners.fnexecution.jobsubmission;
+
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.Struct;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
+import 
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A JobService that prepares and runs jobs on behalf of a client using a 
{@link JobInvoker}.
+ */
+public class JobService extends JobServiceGrpc.JobServiceImplBase implements 
FnService {
+  private static final Logger LOG = LoggerFactory.getLogger(JobService.class);
+
+  public static JobService create(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+return new JobService(artifactStagingServiceProvider, invoker);
+  }
+
+  private final ConcurrentMap preparations;
+  private final ConcurrentMap invocations;
+  private final ArtifactStagingServiceProvider artifactStagingServiceProvider;
+  private final JobInvoker invoker;
+
+  private JobService(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+this.artifactStagingServiceProvider = artifactStagingServiceProvider;
+this.invoker = invoker;
+
+this.preparations = new ConcurrentHashMap<>();
+this.invocations = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void prepare(
+  PrepareJobRequest request,
+  StreamObserver responseObserver) {
+try {
+  LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request);
+  // insert preparation
+  String preparationId =
+  String.format("%s_%d", request.getJobName(), 
ThreadLocalRandom.current().nextInt());
+  GrpcFnServer stagingService =
+  artifactStagingServiceProvider.forJob(preparationId);
+  Struct pipelineOptions = request.getPipelineOptions();
+  if (pipelineOptions == null) {
+LOG.trace("PIPELINE OPTIONS IS NULL");
+throw new NullPointerException("Encountered null pipeline options.");
+/*
+LOG.debug("Encountered null pipeline options.  Using default.");
+pipelineOptions = Struct.getDefaultInstance();
+*/
+  } else {
+LOG.trace("PIPELINE OPTIONS IS NOT NULL");
+  }
+  LOG.trace("PIPELINE OPTIONS {} {}", pipelineOptions.getClass(), 
pipelineOptions);
+  JobPreparation preparation =
+  JobPreparation
+  .builder()
+  .setId(preparationId)
+  .setPipeline(request.getPipeline())
+  .setOptions(pipelineOptions)
+  .setStagingService(stagingService)
+  .build();
+  JobPreparation previous = preparations.putIfAbsent(preparationId, 
preparation);
+  if (previous != null) {
+String errMessage 

[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-10 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100848=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100848
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 10/May/18 21:30
Start Date: 10/May/18 21:30
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5262: 
[BEAM-2588] WIP Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#discussion_r187464808
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java
 ##
 @@ -0,0 +1,309 @@
+package org.apache.beam.runners.fnexecution.jobsubmission;
+
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.Struct;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
+import 
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A JobService that prepares and runs jobs on behalf of a client using a 
{@link JobInvoker}.
+ */
+public class JobService extends JobServiceGrpc.JobServiceImplBase implements 
FnService {
+  private static final Logger LOG = LoggerFactory.getLogger(JobService.class);
+
+  public static JobService create(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+return new JobService(artifactStagingServiceProvider, invoker);
+  }
+
+  private final ConcurrentMap preparations;
+  private final ConcurrentMap invocations;
+  private final ArtifactStagingServiceProvider artifactStagingServiceProvider;
+  private final JobInvoker invoker;
+
+  private JobService(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+this.artifactStagingServiceProvider = artifactStagingServiceProvider;
+this.invoker = invoker;
+
+this.preparations = new ConcurrentHashMap<>();
+this.invocations = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void prepare(
+  PrepareJobRequest request,
+  StreamObserver responseObserver) {
+try {
+  LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request);
+  // insert preparation
+  String preparationId =
+  String.format("%s_%d", request.getJobName(), 
ThreadLocalRandom.current().nextInt());
+  GrpcFnServer stagingService =
+  artifactStagingServiceProvider.forJob(preparationId);
+  Struct pipelineOptions = request.getPipelineOptions();
+  if (pipelineOptions == null) {
+LOG.trace("PIPELINE OPTIONS IS NULL");
+throw new NullPointerException("Encountered null pipeline options.");
+/*
+LOG.debug("Encountered null pipeline options.  Using default.");
+pipelineOptions = Struct.getDefaultInstance();
+*/
+  } else {
+LOG.trace("PIPELINE OPTIONS IS NOT NULL");
+  }
+  LOG.trace("PIPELINE OPTIONS {} {}", pipelineOptions.getClass(), 
pipelineOptions);
+  JobPreparation preparation =
+  JobPreparation
+  .builder()
+  .setId(preparationId)
+  .setPipeline(request.getPipeline())
+  .setOptions(pipelineOptions)
+  .setStagingService(stagingService)
+  .build();
+  JobPreparation previous = preparations.putIfAbsent(preparationId, 
preparation);
+  if (previous != null) {
+String errMessage 

[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-10 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100844=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100844
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 10/May/18 21:18
Start Date: 10/May/18 21:18
Worklog Time Spent: 10m 
  Work Description: axelmagn commented on a change in pull request #5262: 
[BEAM-2588] WIP Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#discussion_r187463317
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java
 ##
 @@ -0,0 +1,309 @@
+package org.apache.beam.runners.fnexecution.jobsubmission;
+
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.Struct;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
+import 
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A JobService that prepares and runs jobs on behalf of a client using a 
{@link JobInvoker}.
 
 Review comment:
   fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 100844)
Time Spent: 1.5h  (was: 1h 20m)

> Portable Flink Runner Job API
> -
>
> Key: BEAM-2588
> URL: https://issues.apache.org/jira/browse/BEAM-2588
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Axel Magnuson
>Priority: Major
>  Labels: portability
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> The portable Flink runner needs to be wired into a job server so that it can 
> accept jobs the job api (https://s.apache.org/beam-job-api).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-10 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100827=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100827
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 10/May/18 20:46
Start Date: 10/May/18 20:46
Worklog Time Spent: 10m 
  Work Description: axelmagn commented on a change in pull request #5262: 
[BEAM-2588] WIP Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#discussion_r187455124
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java
 ##
 @@ -0,0 +1,41 @@
+package org.apache.beam.runners.fnexecution.jobsubmission;
+
+import io.grpc.stub.StreamObserver;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
+
+/**
+ * Internal representation of a Job which has been invoked (prepared and run) 
by a client.
+ */
+public interface JobInvocation {
+
+  /**
+   * Start the job.
+   */
+  void start();
+
+  /**
+   * @return Unique identifier for the job invocation.
+   */
+  String getId();
+
+  /**
+   * Cancel the job.
+   */
+  void cancel();
+
+  /**
+   * Retrieve the job's current state.
+   */
+  JobState.Enum getState();
+
+  /**
+   * Observe job state changes with a {@link StreamObserver}.
+   */
+  void addStateObserver(StreamObserver stateStreamObserver);
+
+  /**
+   * Observe job messages with a {@link StreamObserver}.
+   */
+  void addMessageObserver(StreamObserver messageStreamObserver);
 
 Review comment:
   fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 100827)
Time Spent: 1h 20m  (was: 1h 10m)

> Portable Flink Runner Job API
> -
>
> Key: BEAM-2588
> URL: https://issues.apache.org/jira/browse/BEAM-2588
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Axel Magnuson
>Priority: Major
>  Labels: portability
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> The portable Flink runner needs to be wired into a job server so that it can 
> accept jobs the job api (https://s.apache.org/beam-job-api).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-10 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100788=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100788
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 10/May/18 19:43
Start Date: 10/May/18 19:43
Worklog Time Spent: 10m 
  Work Description: axelmagn commented on a change in pull request #5262: 
[BEAM-2588] WIP Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#discussion_r187437816
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService.java
 ##
 @@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution.artifact;
+
+import org.apache.beam.runners.fnexecution.FnService;
+
+/** An implementation of the Beam Artifact Staging Service. */
+public interface ArtifactStagingService extends FnService {
 
 Review comment:
   That is indeed intended.  There are 2 expected users: the client uploading 
artifacts during job submission, and the runner retrieving those artifacts for 
distribution to workers.  So once we upload the artifacts we need to 
communicate them to the runner somehow, and this is done through the 
createAccessor method.  The alternative was to a) create a separate interface 
for createAccessor(),and b) then create a third interface that extends both 
ArtifactStagingService and interface (a).  However, since getting access to the 
artifacts once they are staged is a reasonable expectation to put on the 
ArtifactStagingService, I chose to just add that method to the existing 
interface.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 100788)
Time Spent: 1h 10m  (was: 1h)

> Portable Flink Runner Job API
> -
>
> Key: BEAM-2588
> URL: https://issues.apache.org/jira/browse/BEAM-2588
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Axel Magnuson
>Priority: Major
>  Labels: portability
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> The portable Flink runner needs to be wired into a job server so that it can 
> accept jobs the job api (https://s.apache.org/beam-job-api).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-09 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100342=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100342
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 10/May/18 00:12
Start Date: 10/May/18 00:12
Worklog Time Spent: 10m 
  Work Description: axelmagn commented on a change in pull request #5262: 
[BEAM-2588] WIP Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#discussion_r187210523
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java
 ##
 @@ -0,0 +1,309 @@
+package org.apache.beam.runners.fnexecution.jobsubmission;
+
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.Struct;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
+import 
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A JobService that prepares and runs jobs on behalf of a client using a 
{@link JobInvoker}.
+ */
+public class JobService extends JobServiceGrpc.JobServiceImplBase implements 
FnService {
+  private static final Logger LOG = LoggerFactory.getLogger(JobService.class);
+
+  public static JobService create(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+return new JobService(artifactStagingServiceProvider, invoker);
+  }
+
+  private final ConcurrentMap preparations;
+  private final ConcurrentMap invocations;
+  private final ArtifactStagingServiceProvider artifactStagingServiceProvider;
+  private final JobInvoker invoker;
+
+  private JobService(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+this.artifactStagingServiceProvider = artifactStagingServiceProvider;
+this.invoker = invoker;
+
+this.preparations = new ConcurrentHashMap<>();
+this.invocations = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void prepare(
+  PrepareJobRequest request,
+  StreamObserver responseObserver) {
+try {
+  LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request);
+  // insert preparation
+  String preparationId =
+  String.format("%s_%d", request.getJobName(), 
ThreadLocalRandom.current().nextInt());
+  GrpcFnServer stagingService =
+  artifactStagingServiceProvider.forJob(preparationId);
+  Struct pipelineOptions = request.getPipelineOptions();
+  if (pipelineOptions == null) {
+LOG.trace("PIPELINE OPTIONS IS NULL");
+throw new NullPointerException("Encountered null pipeline options.");
+/*
+LOG.debug("Encountered null pipeline options.  Using default.");
+pipelineOptions = Struct.getDefaultInstance();
+*/
+  } else {
+LOG.trace("PIPELINE OPTIONS IS NOT NULL");
+  }
+  LOG.trace("PIPELINE OPTIONS {} {}", pipelineOptions.getClass(), 
pipelineOptions);
+  JobPreparation preparation =
+  JobPreparation
+  .builder()
+  .setId(preparationId)
+  .setPipeline(request.getPipeline())
+  .setOptions(pipelineOptions)
+  .setStagingService(stagingService)
+  .build();
+  JobPreparation previous = preparations.putIfAbsent(preparationId, 
preparation);
+  if (previous != null) {
+String errMessage 

[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-09 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100341=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100341
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 10/May/18 00:10
Start Date: 10/May/18 00:10
Worklog Time Spent: 10m 
  Work Description: axelmagn commented on a change in pull request #5262: 
[BEAM-2588] WIP Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#discussion_r187210291
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java
 ##
 @@ -0,0 +1,309 @@
+package org.apache.beam.runners.fnexecution.jobsubmission;
+
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.Struct;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
+import 
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A JobService that prepares and runs jobs on behalf of a client using a 
{@link JobInvoker}.
+ */
+public class JobService extends JobServiceGrpc.JobServiceImplBase implements 
FnService {
+  private static final Logger LOG = LoggerFactory.getLogger(JobService.class);
+
+  public static JobService create(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+return new JobService(artifactStagingServiceProvider, invoker);
+  }
+
+  private final ConcurrentMap preparations;
+  private final ConcurrentMap invocations;
+  private final ArtifactStagingServiceProvider artifactStagingServiceProvider;
+  private final JobInvoker invoker;
+
+  private JobService(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+this.artifactStagingServiceProvider = artifactStagingServiceProvider;
+this.invoker = invoker;
+
+this.preparations = new ConcurrentHashMap<>();
+this.invocations = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void prepare(
+  PrepareJobRequest request,
+  StreamObserver responseObserver) {
+try {
+  LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request);
+  // insert preparation
+  String preparationId =
+  String.format("%s_%d", request.getJobName(), 
ThreadLocalRandom.current().nextInt());
+  GrpcFnServer stagingService =
+  artifactStagingServiceProvider.forJob(preparationId);
+  Struct pipelineOptions = request.getPipelineOptions();
+  if (pipelineOptions == null) {
+LOG.trace("PIPELINE OPTIONS IS NULL");
+throw new NullPointerException("Encountered null pipeline options.");
+/*
+LOG.debug("Encountered null pipeline options.  Using default.");
+pipelineOptions = Struct.getDefaultInstance();
+*/
+  } else {
+LOG.trace("PIPELINE OPTIONS IS NOT NULL");
+  }
+  LOG.trace("PIPELINE OPTIONS {} {}", pipelineOptions.getClass(), 
pipelineOptions);
+  JobPreparation preparation =
+  JobPreparation
+  .builder()
+  .setId(preparationId)
+  .setPipeline(request.getPipeline())
+  .setOptions(pipelineOptions)
+  .setStagingService(stagingService)
+  .build();
+  JobPreparation previous = preparations.putIfAbsent(preparationId, 
preparation);
+  if (previous != null) {
+String errMessage 

[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-09 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100337=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100337
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 10/May/18 00:07
Start Date: 10/May/18 00:07
Worklog Time Spent: 10m 
  Work Description: axelmagn commented on a change in pull request #5262: 
[BEAM-2588] WIP Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#discussion_r187209845
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java
 ##
 @@ -0,0 +1,309 @@
+package org.apache.beam.runners.fnexecution.jobsubmission;
+
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.Struct;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
+import 
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A JobService that prepares and runs jobs on behalf of a client using a 
{@link JobInvoker}.
+ */
+public class JobService extends JobServiceGrpc.JobServiceImplBase implements 
FnService {
+  private static final Logger LOG = LoggerFactory.getLogger(JobService.class);
+
+  public static JobService create(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+return new JobService(artifactStagingServiceProvider, invoker);
+  }
+
+  private final ConcurrentMap preparations;
+  private final ConcurrentMap invocations;
+  private final ArtifactStagingServiceProvider artifactStagingServiceProvider;
+  private final JobInvoker invoker;
+
+  private JobService(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+this.artifactStagingServiceProvider = artifactStagingServiceProvider;
+this.invoker = invoker;
+
+this.preparations = new ConcurrentHashMap<>();
+this.invocations = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void prepare(
+  PrepareJobRequest request,
+  StreamObserver responseObserver) {
+try {
+  LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request);
+  // insert preparation
+  String preparationId =
+  String.format("%s_%d", request.getJobName(), 
ThreadLocalRandom.current().nextInt());
+  GrpcFnServer stagingService =
+  artifactStagingServiceProvider.forJob(preparationId);
+  Struct pipelineOptions = request.getPipelineOptions();
+  if (pipelineOptions == null) {
+LOG.trace("PIPELINE OPTIONS IS NULL");
+throw new NullPointerException("Encountered null pipeline options.");
+/*
+LOG.debug("Encountered null pipeline options.  Using default.");
+pipelineOptions = Struct.getDefaultInstance();
+*/
+  } else {
+LOG.trace("PIPELINE OPTIONS IS NOT NULL");
+  }
+  LOG.trace("PIPELINE OPTIONS {} {}", pipelineOptions.getClass(), 
pipelineOptions);
+  JobPreparation preparation =
+  JobPreparation
+  .builder()
+  .setId(preparationId)
+  .setPipeline(request.getPipeline())
+  .setOptions(pipelineOptions)
+  .setStagingService(stagingService)
+  .build();
+  JobPreparation previous = preparations.putIfAbsent(preparationId, 
preparation);
+  if (previous != null) {
+String errMessage 

[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-09 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100335=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100335
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 09/May/18 23:49
Start Date: 09/May/18 23:49
Worklog Time Spent: 10m 
  Work Description: axelmagn commented on a change in pull request #5262: 
[BEAM-2588] WIP Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#discussion_r187207191
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java
 ##
 @@ -0,0 +1,309 @@
+package org.apache.beam.runners.fnexecution.jobsubmission;
+
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.Struct;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
+import 
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A JobService that prepares and runs jobs on behalf of a client using a 
{@link JobInvoker}.
+ */
+public class JobService extends JobServiceGrpc.JobServiceImplBase implements 
FnService {
+  private static final Logger LOG = LoggerFactory.getLogger(JobService.class);
+
+  public static JobService create(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+return new JobService(artifactStagingServiceProvider, invoker);
+  }
+
+  private final ConcurrentMap preparations;
+  private final ConcurrentMap invocations;
+  private final ArtifactStagingServiceProvider artifactStagingServiceProvider;
+  private final JobInvoker invoker;
+
+  private JobService(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+this.artifactStagingServiceProvider = artifactStagingServiceProvider;
+this.invoker = invoker;
+
+this.preparations = new ConcurrentHashMap<>();
+this.invocations = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void prepare(
+  PrepareJobRequest request,
+  StreamObserver responseObserver) {
+try {
+  LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request);
+  // insert preparation
+  String preparationId =
+  String.format("%s_%d", request.getJobName(), 
ThreadLocalRandom.current().nextInt());
+  GrpcFnServer stagingService =
+  artifactStagingServiceProvider.forJob(preparationId);
+  Struct pipelineOptions = request.getPipelineOptions();
+  if (pipelineOptions == null) {
+LOG.trace("PIPELINE OPTIONS IS NULL");
+throw new NullPointerException("Encountered null pipeline options.");
+/*
+LOG.debug("Encountered null pipeline options.  Using default.");
+pipelineOptions = Struct.getDefaultInstance();
+*/
+  } else {
+LOG.trace("PIPELINE OPTIONS IS NOT NULL");
+  }
+  LOG.trace("PIPELINE OPTIONS {} {}", pipelineOptions.getClass(), 
pipelineOptions);
+  JobPreparation preparation =
+  JobPreparation
+  .builder()
+  .setId(preparationId)
+  .setPipeline(request.getPipeline())
+  .setOptions(pipelineOptions)
+  .setStagingService(stagingService)
+  .build();
+  JobPreparation previous = preparations.putIfAbsent(preparationId, 
preparation);
+  if (previous != null) {
+String errMessage 

[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-09 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100333=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100333
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 09/May/18 23:43
Start Date: 09/May/18 23:43
Worklog Time Spent: 10m 
  Work Description: axelmagn commented on a change in pull request #5262: 
[BEAM-2588] WIP Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#discussion_r187206333
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobService.java
 ##
 @@ -0,0 +1,309 @@
+package org.apache.beam.runners.fnexecution.jobsubmission;
+
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.Struct;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
+import 
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingServiceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A JobService that prepares and runs jobs on behalf of a client using a 
{@link JobInvoker}.
+ */
+public class JobService extends JobServiceGrpc.JobServiceImplBase implements 
FnService {
+  private static final Logger LOG = LoggerFactory.getLogger(JobService.class);
+
+  public static JobService create(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+return new JobService(artifactStagingServiceProvider, invoker);
+  }
+
+  private final ConcurrentMap preparations;
+  private final ConcurrentMap invocations;
+  private final ArtifactStagingServiceProvider artifactStagingServiceProvider;
+  private final JobInvoker invoker;
+
+  private JobService(
+  ArtifactStagingServiceProvider artifactStagingServiceProvider, 
JobInvoker invoker) {
+this.artifactStagingServiceProvider = artifactStagingServiceProvider;
+this.invoker = invoker;
+
+this.preparations = new ConcurrentHashMap<>();
+this.invocations = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void prepare(
+  PrepareJobRequest request,
+  StreamObserver responseObserver) {
+try {
+  LOG.trace("{} {}", PrepareJobRequest.class.getSimpleName(), request);
+  // insert preparation
+  String preparationId =
+  String.format("%s_%d", request.getJobName(), 
ThreadLocalRandom.current().nextInt());
+  GrpcFnServer stagingService =
+  artifactStagingServiceProvider.forJob(preparationId);
+  Struct pipelineOptions = request.getPipelineOptions();
+  if (pipelineOptions == null) {
+LOG.trace("PIPELINE OPTIONS IS NULL");
+throw new NullPointerException("Encountered null pipeline options.");
+/*
+LOG.debug("Encountered null pipeline options.  Using default.");
+pipelineOptions = Struct.getDefaultInstance();
+*/
+  } else {
+LOG.trace("PIPELINE OPTIONS IS NOT NULL");
+  }
+  LOG.trace("PIPELINE OPTIONS {} {}", pipelineOptions.getClass(), 
pipelineOptions);
+  JobPreparation preparation =
+  JobPreparation
+  .builder()
+  .setId(preparationId)
+  .setPipeline(request.getPipeline())
+  .setOptions(pipelineOptions)
+  .setStagingService(stagingService)
+  .build();
+  JobPreparation previous = preparations.putIfAbsent(preparationId, 
preparation);
+  if (previous != null) {
+String errMessage 

[jira] [Work logged] (BEAM-2588) Portable Flink Runner Job API

2018-05-09 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2588?focusedWorklogId=100331=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100331
 ]

ASF GitHub Bot logged work on BEAM-2588:


Author: ASF GitHub Bot
Created on: 09/May/18 23:38
Start Date: 09/May/18 23:38
Worklog Time Spent: 10m 
  Work Description: axelmagn commented on a change in pull request #5262: 
[BEAM-2588] WIP Portability Runner Job Service
URL: https://github.com/apache/beam/pull/5262#discussion_r187205740
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java
 ##
 @@ -0,0 +1,41 @@
+package org.apache.beam.runners.fnexecution.jobsubmission;
+
+import io.grpc.stub.StreamObserver;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
+
+/**
+ * Internal representation of a Job which has been invoked (prepared and run) 
by a client.
+ */
+public interface JobInvocation {
+
+  /**
+   * Start the job.
+   */
+  void start();
+
+  /**
+   * @return Unique identifier for the job invocation.
+   */
+  String getId();
+
+  /**
+   * Cancel the job.
+   */
+  void cancel();
+
+  /**
+   * Retrieve the job's current state.
+   */
+  JobState.Enum getState();
+
+  /**
+   * Observe job state changes with a {@link StreamObserver}.
+   */
+  void addStateObserver(StreamObserver stateStreamObserver);
+
+  /**
+   * Observe job messages with a {@link StreamObserver}.
+   */
+  void addMessageObserver(StreamObserver messageStreamObserver);
 
 Review comment:
   Fair enough.  I'll just use a Consumer interface rather than defining 
something new then.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 100331)
Time Spent: 10m
Remaining Estimate: 0h

> Portable Flink Runner Job API
> -
>
> Key: BEAM-2588
> URL: https://issues.apache.org/jira/browse/BEAM-2588
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Axel Magnuson
>Priority: Major
>  Labels: portability
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The portable Flink runner needs to be wired into a job server so that it can 
> accept jobs the job api (https://s.apache.org/beam-job-api).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)