Add Proto Definitions for the Artifact API Update the Job API to permit a "prepare" phase of executing a pipeline, where prerequisite work like staging artifacts can be performed before the job is executed.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1e21f453 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1e21f453 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1e21f453 Branch: refs/heads/master Commit: 1e21f453721cb7aef0783cb73d72f6b928685515 Parents: 0f77af8 Author: Thomas Groh <tg...@google.com> Authored: Thu Aug 17 17:45:09 2017 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Fri Aug 25 19:03:37 2017 -0700 ---------------------------------------------------------------------- .../src/main/proto/beam_artifact_api.proto | 122 +++++++++++++++++++ .../src/main/proto/beam_job_api.proto | 46 +++++-- 2 files changed, 157 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1e21f453/sdks/common/runner-api/src/main/proto/beam_artifact_api.proto ---------------------------------------------------------------------- diff --git a/sdks/common/runner-api/src/main/proto/beam_artifact_api.proto b/sdks/common/runner-api/src/main/proto/beam_artifact_api.proto new file mode 100644 index 0000000..6e39d88 --- /dev/null +++ b/sdks/common/runner-api/src/main/proto/beam_artifact_api.proto @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Protocol Buffers describing the Artifact API, for communicating with a runner + * for artifact staging and retrieval over GRPC. + */ + +syntax = "proto3"; + +package org.apache.beam.runner_api.v1; + +option java_package = "org.apache.beam.sdk.common.runner.v1"; +option java_outer_classname = "ArtifactApi"; + +// A service to stage artifacts for use in a Job. +// +// RPCs made to an ArtifactStagingService endpoint should include some form of identification for +// the job as a header. +service ArtifactStagingService { + // Stage an artifact to be available during job execution. The first request must contain the + // name of the artifact. All future requests must contain sequential chunks of the content of + // the artifact. + rpc PutArtifact(stream PutArtifactRequest) returns (PutArtifactResponse); + + // Commit the manifest for a Job. All artifacts must have been successfully uploaded + // before this call is made. + // + // Throws error INVALID_ARGUMENT if not all of the members of the manifest are present + rpc CommitManifest(CommitManifestRequest) returns (CommitManifestResponse); +} + +// A service to retrieve artifacts for use in a Job. +service ArtifactRetrievalService { + // Get the manifest for the job + rpc GetManifest(GetManifestRequest) returns (GetManifestResponse); + + // Get an artifact staged for the job. The requested artifact must be within the manifest + rpc GetArtifact(GetArtifactRequest) returns (stream ArtifactChunk); +} + +// An artifact identifier and associated metadata. +message Artifact { + // (Required) The name of the artifact. + string name = 1; + + // (Optional) The Unix-like permissions of the artifact + int32 permissions = 2; + + // (Optional) The md5 checksum of the artifact. + string md5 = 3; +} + +// A collection of artifacts. +message Manifest { + repeated Artifact artifact = 1; +} + +// A request to get the manifest of a Job. +message GetManifestRequest {} + +// A response containing a job manifest. +message GetManifestResponse { + Manifest manifest = 1; +} + +// A request to get an artifact. The artifact must be present in the manifest for the job. +message GetArtifactRequest { + // (Required) The name of the artifact to retrieve. + string name = 1; +} + +// Part of an artifact. +message ArtifactChunk { + bytes data = 1; +} + +// A request to stage an artifact. +message PutArtifactRequest { + // (Required) + oneof content { + // The name of the artifact. The first message in a PutArtifact call must contain the name + // of the artifact. + string name = 1; + + // A chunk of the artifact. All messages after the first in a PutArtifact call must contain a + // chunk. + ArtifactChunk data = 2; + } +} + +message PutArtifactResponse { +} + +// A request to commit the manifest for a Job. All artifacts must have been successfully uploaded +// before this call is made. +message CommitManifestRequest { + // (Required) The manifest to commit. + Manifest manifest = 1; +} + +// The result of committing a manifest. +message CommitManifestResponse { + // (Required) An opaque token representing the entirety of the staged artifacts. + string staging_token = 1; +} + http://git-wip-us.apache.org/repos/asf/beam/blob/1e21f453/sdks/common/runner-api/src/main/proto/beam_job_api.proto ---------------------------------------------------------------------- diff --git a/sdks/common/runner-api/src/main/proto/beam_job_api.proto b/sdks/common/runner-api/src/main/proto/beam_job_api.proto index 7be14cc..8946d2a 100644 --- a/sdks/common/runner-api/src/main/proto/beam_job_api.proto +++ b/sdks/common/runner-api/src/main/proto/beam_job_api.proto @@ -34,36 +34,60 @@ import "google/protobuf/struct.proto"; // Job Service for running RunnerAPI pipelines service JobService { + // Prepare a job for execution. The job will not be executed until a call is made to run with the + // returned preparationId. + rpc prepare (PrepareJobRequest) returns (PrepareJobResponse); + // Submit the job for execution - rpc run (SubmitJobRequest) returns (SubmitJobResponse) {} + rpc run (RunJobRequest) returns (RunJobResponse); // Get the current state of the job - rpc getState (GetJobStateRequest) returns (GetJobStateResponse) {} + rpc getState (GetJobStateRequest) returns (GetJobStateResponse); // Cancel the job - rpc cancel (CancelJobRequest) returns (CancelJobResponse) {} + rpc cancel (CancelJobRequest) returns (CancelJobResponse); // Subscribe to a stream of state changes of the job, will immediately return the current state of the job as the first response. - rpc getStateStream (GetJobStateRequest) returns (stream GetJobStateResponse) {} + rpc getStateStream (GetJobStateRequest) returns (stream GetJobStateResponse); // Subscribe to a stream of state changes and messages from the job - rpc getMessageStream (JobMessagesRequest) returns (stream JobMessagesResponse) {} + rpc getMessageStream (JobMessagesRequest) returns (stream JobMessagesResponse); } -// Submit is a synchronus request that returns a jobId back +// Prepare is a synchronous request that returns a preparationId back // Throws error GRPC_STATUS_UNAVAILABLE if server is down -// Throws error ALREADY_EXISTS if the jobName is reused as runners are permitted to deduplicate based on the name of the job. +// Throws error ALREADY_EXISTS if the jobName is reused. Runners are permitted to deduplicate based on the name of the job. // Throws error UNKNOWN for all other issues -message SubmitJobRequest { +message PrepareJobRequest { org.apache.beam.runner_api.v1.Pipeline pipeline = 1; // (required) google.protobuf.Struct pipelineOptions = 2; // (required) string jobName = 3; // (required) } -message SubmitJobResponse { - // JobId is used as an identifier for the job in all future calls. - string jobId = 1; // (required) +message PrepareJobResponse { + // (required) The ID used to associate calls made while preparing the job. preparationId is used + // to run the job, as well as in other pre-execution APIs such as Artifact staging. + string preparationId = 1; +} + + +// Run is a synchronous request that returns a jobId back. +// Throws error GRPC_STATUS_UNAVAILABLE if server is down +// Throws error NOT_FOUND if the preparation ID does not exist +// Throws error UNKNOWN for all other issues +message RunJobRequest { + // (required) The ID provided by an earlier call to prepare. Runs the job. All prerequisite tasks + // must have been completed. + string preparationId = 1; + // (optional) If any artifacts have been staged for this job, contains the staging_token returned + // from the CommitManifestResponse. + string stagingToken = 2; +} + + +message RunJobResponse { + string jobId = 1; // (required) The ID for the executing job }