This is an automated email from the ASF dual-hosted git repository. goenka pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 194d85f [BEAM-6725] share some Flink job invocation code with Samza runner new ee8f260 Merge pull request #7941 from ibzib/portable-job-invocation 194d85f is described below commit 194d85f950f5dbe2c114e7cd20e6dbb54adfd5a5 Author: Kyle Weaver <kcwea...@google.com> AuthorDate: Fri Feb 22 16:25:19 2019 -0800 [BEAM-6725] share some Flink job invocation code with Samza runner --- .../apache/beam/runners/flink/FlinkJobInvoker.java | 45 +++--- .../beam/runners/flink/FlinkPipelineOptions.java | 2 +- ...JobInvocation.java => FlinkPipelineRunner.java} | 163 ++------------------- .../beam/runners/flink/FlinkSavepointTest.java | 5 +- .../beam/runners/flink/PortableExecutionTest.java | 5 +- .../runners/flink/PortableStateExecutionTest.java | 5 +- .../runners/flink/PortableTimersExecutionTest.java | 5 +- .../fnexecution/jobsubmission/JobInvocation.java | 148 ++++++++++++++++++- .../fnexecution/jobsubmission/JobInvoker.java | 33 ++++- ...JobInvoker.java => PortablePipelineRunner.java} | 12 +- .../beam/runners/samza/SamzaJobInvocation.java | 128 ---------------- .../beam/runners/samza/SamzaJobServerDriver.java | 14 +- .../beam/runners/samza/SamzaPipelineOptions.java | 2 +- .../beam/runners/samza/SamzaPipelineRunner.java | 57 +++++++ 14 files changed, 290 insertions(+), 334 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java index 9dde9e3..38dcdc8 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java @@ -20,9 +20,8 @@ package org.apache.beam.runners.flink; import static org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage; import java.io.IOException; +import java.util.List; import java.util.UUID; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; import javax.annotation.Nullable; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; @@ -31,39 +30,30 @@ import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker; import org.apache.beam.sdk.options.PortablePipelineOptions; import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Struct; import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListeningExecutorService; -import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.MoreExecutors; -import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** Job Invoker for the {@link FlinkRunner}. */ -public class FlinkJobInvoker implements JobInvoker { +public class FlinkJobInvoker extends JobInvoker { private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvoker.class); public static FlinkJobInvoker create(FlinkJobServerDriver.FlinkServerConfiguration serverConfig) { - ThreadFactory threadFactory = - new ThreadFactoryBuilder() - .setNameFormat("flink-runner-job-invoker") - .setDaemon(true) - .build(); - ListeningExecutorService executorService = - MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(threadFactory)); - return new FlinkJobInvoker(executorService, serverConfig); + return new FlinkJobInvoker(serverConfig); } - private final ListeningExecutorService executorService; private final FlinkJobServerDriver.FlinkServerConfiguration serverConfig; - private FlinkJobInvoker( - ListeningExecutorService executorService, - FlinkJobServerDriver.FlinkServerConfiguration serverConfig) { - this.executorService = executorService; + private FlinkJobInvoker(FlinkJobServerDriver.FlinkServerConfiguration serverConfig) { + super("flink-runner-job-invoker"); this.serverConfig = serverConfig; } @Override - public JobInvocation invoke( - RunnerApi.Pipeline pipeline, Struct options, @Nullable String retrievalToken) + protected JobInvocation invokeWithExecutor( + RunnerApi.Pipeline pipeline, + Struct options, + @Nullable String retrievalToken, + ListeningExecutorService executorService) throws IOException { // TODO: How to make Java/Python agree on names of keys and their values? LOG.trace("Parsing pipeline options"); @@ -85,7 +75,7 @@ public class FlinkJobInvoker implements JobInvoker { flinkOptions.setRunner(null); - return FlinkJobInvocation.create( + return createJobInvocation( invocationId, retrievalToken, executorService, @@ -94,4 +84,17 @@ public class FlinkJobInvoker implements JobInvoker { serverConfig.getFlinkConfDir(), detectClassPathResourcesToStage(FlinkJobInvoker.class.getClassLoader())); } + + static JobInvocation createJobInvocation( + String invocationId, + String retrievalToken, + ListeningExecutorService executorService, + RunnerApi.Pipeline pipeline, + FlinkPipelineOptions flinkOptions, + @Nullable String confDir, + List<String> filesToStage) { + FlinkPipelineRunner pipelineRunner = + new FlinkPipelineRunner(invocationId, retrievalToken, flinkOptions, confDir, filesToStage); + return new JobInvocation(invocationId, executorService, pipeline, pipelineRunner); + } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index cc94793..e1103bc 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -28,7 +28,7 @@ import org.apache.flink.api.common.ExecutionMode; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.streaming.api.CheckpointingMode; -/** Options which can be used to configure a Flink PipelineRunner. */ +/** Options which can be used to configure a Flink PortablePipelineRunner. */ public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOptions, StreamingOptions { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java similarity index 50% rename from runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java index 421c38e..9ccd973 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java @@ -17,91 +17,53 @@ */ package org.apache.beam.runners.flink; -import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull; -import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Throwables.getRootCause; -import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Throwables.getStackTraceAsString; -import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Set; -import java.util.function.Consumer; import javax.annotation.Nullable; -import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; -import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; import org.apache.beam.runners.core.construction.graph.ExecutableStage; import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser; -import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; +import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineRunner; import org.apache.beam.runners.fnexecution.provisioning.JobInfo; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets; -import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.FutureCallback; -import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.Futures; -import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListenableFuture; -import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListeningExecutorService; import org.apache.flink.api.common.JobExecutionResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Invocation of a Flink Job via {@link FlinkRunner}. */ -public class FlinkJobInvocation implements JobInvocation { - private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvocation.class); - - public static FlinkJobInvocation create( - String id, - String retrievalToken, - ListeningExecutorService executorService, - Pipeline pipeline, - FlinkPipelineOptions pipelineOptions, - @Nullable String confDir, - List<String> filesToStage) { - return new FlinkJobInvocation( - id, retrievalToken, executorService, pipeline, pipelineOptions, confDir, filesToStage); - } +/** Runs a Pipeline on Flink via {@link FlinkRunner}. */ +public class FlinkPipelineRunner implements PortablePipelineRunner { + private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineRunner.class); private final String id; private final String retrievalToken; - private final ListeningExecutorService executorService; - private final RunnerApi.Pipeline pipeline; private final FlinkPipelineOptions pipelineOptions; private final String confDir; private final List<String> filesToStage; - private JobState.Enum jobState; - private List<Consumer<JobState.Enum>> stateObservers; - private List<Consumer<JobMessage>> messageObservers; - - @Nullable private ListenableFuture<PipelineResult> invocationFuture; - private FlinkJobInvocation( + public FlinkPipelineRunner( String id, String retrievalToken, - ListeningExecutorService executorService, - Pipeline pipeline, FlinkPipelineOptions pipelineOptions, @Nullable String confDir, List<String> filesToStage) { this.id = id; this.retrievalToken = retrievalToken; - this.executorService = executorService; - this.pipeline = pipeline; this.pipelineOptions = pipelineOptions; this.confDir = confDir; this.filesToStage = filesToStage; - this.invocationFuture = null; - this.jobState = JobState.Enum.STOPPED; - this.stateObservers = new ArrayList<>(); - this.messageObservers = new ArrayList<>(); } - private PipelineResult runPipeline() throws Exception { + @Override + public PipelineResult run(final Pipeline pipeline) throws Exception { MetricsEnvironment.setMetricsSupported(false); FlinkPortablePipelineTranslator<?> translator; @@ -111,12 +73,12 @@ public class FlinkJobInvocation implements JobInvocation { } else { translator = new FlinkStreamingPortablePipelineTranslator(); } - return runPipelineWithTranslator(translator); + return runPipelineWithTranslator(pipeline, translator); } private <T extends FlinkPortablePipelineTranslator.TranslationContext> - PipelineResult runPipelineWithTranslator(FlinkPortablePipelineTranslator<T> translator) - throws Exception { + PipelineResult runPipelineWithTranslator( + final Pipeline pipeline, FlinkPortablePipelineTranslator<T> translator) throws Exception { LOG.info("Translating pipeline to Flink program."); // Don't let the fuser fuse any subcomponents of native transforms. @@ -178,111 +140,6 @@ public class FlinkJobInvocation implements JobInvocation { } } - @Override - public synchronized void start() { - LOG.info("Starting job invocation {}", getId()); - if (getState() != JobState.Enum.STOPPED) { - throw new IllegalStateException(String.format("Job %s already running.", getId())); - } - setState(JobState.Enum.STARTING); - invocationFuture = executorService.submit(this::runPipeline); - // TODO: Defer transitioning until the pipeline is up and running. - setState(JobState.Enum.RUNNING); - Futures.addCallback( - invocationFuture, - new FutureCallback<PipelineResult>() { - @Override - public void onSuccess(@Nullable PipelineResult pipelineResult) { - if (pipelineResult != null) { - checkArgument( - pipelineResult.getState() == PipelineResult.State.DONE, - "Success on non-Done state: " + pipelineResult.getState()); - setState(JobState.Enum.DONE); - } else { - setState(JobState.Enum.UNSPECIFIED); - } - } - - @Override - public void onFailure(Throwable throwable) { - String message = String.format("Error during job invocation %s.", getId()); - LOG.error(message, throwable); - sendMessage( - JobMessage.newBuilder() - .setMessageText(getStackTraceAsString(throwable)) - .setImportance(JobMessage.MessageImportance.JOB_MESSAGE_DEBUG) - .build()); - sendMessage( - JobMessage.newBuilder() - .setMessageText(getRootCause(throwable).toString()) - .setImportance(JobMessage.MessageImportance.JOB_MESSAGE_ERROR) - .build()); - setState(JobState.Enum.FAILED); - } - }, - executorService); - } - - @Override - public String getId() { - return id; - } - - @Override - public synchronized void cancel() { - LOG.info("Canceling job invocation {}", getId()); - if (this.invocationFuture != null) { - this.invocationFuture.cancel(true /* mayInterruptIfRunning */); - Futures.addCallback( - invocationFuture, - new FutureCallback<PipelineResult>() { - @Override - public void onSuccess(@Nullable PipelineResult pipelineResult) { - if (pipelineResult != null) { - try { - pipelineResult.cancel(); - } catch (IOException exn) { - throw new RuntimeException(exn); - } - } - } - - @Override - public void onFailure(Throwable throwable) {} - }, - executorService); - } - } - - @Override - public JobState.Enum getState() { - return this.jobState; - } - - @Override - public synchronized void addStateListener(Consumer<JobState.Enum> stateStreamObserver) { - stateStreamObserver.accept(getState()); - stateObservers.add(stateStreamObserver); - } - - @Override - public synchronized void addMessageListener(Consumer<JobMessage> messageStreamObserver) { - messageObservers.add(messageStreamObserver); - } - - private synchronized void setState(JobState.Enum state) { - this.jobState = state; - for (Consumer<JobState.Enum> observer : stateObservers) { - observer.accept(state); - } - } - - private synchronized void sendMessage(JobMessage message) { - for (Consumer<JobMessage> observer : messageObservers) { - observer.accept(message); - } - } - /** Indicates whether the given pipeline has any unbounded PCollections. */ private static boolean hasUnboundedPCollections(RunnerApi.Pipeline pipeline) { checkNotNull(pipeline); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java index 756b706..41e9299 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.Executors; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.Environments; import org.apache.beam.runners.core.construction.PipelineTranslation; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -194,8 +195,8 @@ public class FlinkSavepointTest implements Serializable { ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); try { - FlinkJobInvocation jobInvocation = - FlinkJobInvocation.create( + JobInvocation jobInvocation = + FlinkJobInvoker.createJobInvocation( "id", "none", executorService, diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java index c817cb9..6214ff4 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java @@ -26,6 +26,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.Environments; import org.apache.beam.runners.core.construction.JavaReadViaImpulse; import org.apache.beam.runners.core.construction.PipelineTranslation; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.BigEndianLongCoder; import org.apache.beam.sdk.coders.KvCoder; @@ -140,8 +141,8 @@ public class PortableExecutionTest implements Serializable { RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p); // execute the pipeline - FlinkJobInvocation jobInvocation = - FlinkJobInvocation.create( + JobInvocation jobInvocation = + FlinkJobInvoker.createJobInvocation( "fakeId", "fakeRetrievalToken", flinkJobExecutor, diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java index e5dd6b9..c96ebb8 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java @@ -26,6 +26,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.Environments; import org.apache.beam.runners.core.construction.JavaReadViaImpulse; import org.apache.beam.runners.core.construction.PipelineTranslation; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.options.PipelineOptions; @@ -194,8 +195,8 @@ public class PortableStateExecutionTest implements Serializable { RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p); - FlinkJobInvocation jobInvocation = - FlinkJobInvocation.create( + JobInvocation jobInvocation = + FlinkJobInvoker.createJobInvocation( "id", "none", flinkJobExecutor, diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java index 544a4e5..bffc903 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java @@ -31,6 +31,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.Environments; import org.apache.beam.runners.core.construction.JavaReadViaImpulse; import org.apache.beam.runners.core.construction.PipelineTranslation; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.PipelineOptions; @@ -181,8 +182,8 @@ public class PortableTimersExecutionTest implements Serializable { RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline); - FlinkJobInvocation jobInvocation = - FlinkJobInvocation.create( + JobInvocation jobInvocation = + FlinkJobInvoker.createJobInvocation( "id", "none", flinkJobExecutor, diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java index 831edc9..292e6e8 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java @@ -17,31 +17,165 @@ */ package org.apache.beam.runners.fnexecution.jobsubmission; +import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Throwables.getRootCause; +import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Throwables.getStackTraceAsString; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.function.Consumer; +import javax.annotation.Nullable; import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.FutureCallback; +import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.Futures; +import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListenableFuture; +import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListeningExecutorService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** Internal representation of a Job which has been invoked (prepared and run) by a client. */ -public interface JobInvocation { +public class JobInvocation { + + private static final Logger LOG = LoggerFactory.getLogger(JobInvocation.class); + + private final RunnerApi.Pipeline pipeline; + private final PortablePipelineRunner pipelineRunner; + private final String id; + private final ListeningExecutorService executorService; + private List<Consumer<Enum>> stateObservers; + private List<Consumer<JobMessage>> messageObservers; + private JobState.Enum jobState; + @Nullable private ListenableFuture<PipelineResult> invocationFuture; + + public JobInvocation( + String id, + ListeningExecutorService executorService, + Pipeline pipeline, + PortablePipelineRunner pipelineRunner) { + this.id = id; + this.executorService = executorService; + this.pipeline = pipeline; + this.pipelineRunner = pipelineRunner; + this.stateObservers = new ArrayList<>(); + this.messageObservers = new ArrayList<>(); + this.invocationFuture = null; + this.jobState = JobState.Enum.STOPPED; + } + + private PipelineResult runPipeline() throws Exception { + return pipelineRunner.run(pipeline); + } /** Start the job. */ - void start(); + public synchronized void start() { + LOG.info("Starting job invocation {}", getId()); + if (getState() != JobState.Enum.STOPPED) { + throw new IllegalStateException(String.format("Job %s already running.", getId())); + } + setState(JobState.Enum.STARTING); + invocationFuture = executorService.submit(this::runPipeline); + // TODO: Defer transitioning until the pipeline is up and running. + setState(JobState.Enum.RUNNING); + Futures.addCallback( + invocationFuture, + new FutureCallback<PipelineResult>() { + @Override + public void onSuccess(@Nullable PipelineResult pipelineResult) { + if (pipelineResult != null) { + checkArgument( + pipelineResult.getState() == PipelineResult.State.DONE, + "Success on non-Done state: " + pipelineResult.getState()); + setState(JobState.Enum.DONE); + } else { + setState(JobState.Enum.UNSPECIFIED); + } + } + + @Override + public void onFailure(Throwable throwable) { + String message = String.format("Error during job invocation %s.", getId()); + LOG.error(message, throwable); + sendMessage( + JobMessage.newBuilder() + .setMessageText(getStackTraceAsString(throwable)) + .setImportance(JobMessage.MessageImportance.JOB_MESSAGE_DEBUG) + .build()); + sendMessage( + JobMessage.newBuilder() + .setMessageText(getRootCause(throwable).toString()) + .setImportance(JobMessage.MessageImportance.JOB_MESSAGE_ERROR) + .build()); + setState(JobState.Enum.FAILED); + } + }, + executorService); + } /** @return Unique identifier for the job invocation. */ - String getId(); + public String getId() { + return id; + } /** Cancel the job. */ - void cancel(); + public synchronized void cancel() { + LOG.info("Canceling job invocation {}", getId()); + if (this.invocationFuture != null) { + this.invocationFuture.cancel(true /* mayInterruptIfRunning */); + Futures.addCallback( + invocationFuture, + new FutureCallback<PipelineResult>() { + @Override + public void onSuccess(@Nullable PipelineResult pipelineResult) { + if (pipelineResult != null) { + try { + pipelineResult.cancel(); + } catch (IOException exn) { + throw new RuntimeException(exn); + } + } + } + + @Override + public void onFailure(Throwable throwable) {} + }, + executorService); + } + } /** Retrieve the job's current state. */ - JobState.Enum getState(); + public JobState.Enum getState() { + return this.jobState; + } /** Listen for job state changes with a {@link Consumer}. */ - void addStateListener(Consumer<Enum> stateStreamObserver); + public synchronized void addStateListener(Consumer<JobState.Enum> stateStreamObserver) { + stateStreamObserver.accept(getState()); + stateObservers.add(stateStreamObserver); + } /** Listen for job messages with a {@link Consumer}. */ - void addMessageListener(Consumer<JobMessage> messageStreamObserver); + public synchronized void addMessageListener(Consumer<JobMessage> messageStreamObserver) { + messageObservers.add(messageStreamObserver); + } + + private synchronized void setState(JobState.Enum state) { + this.jobState = state; + for (Consumer<JobState.Enum> observer : stateObservers) { + observer.accept(state); + } + } + + private synchronized void sendMessage(JobMessage message) { + for (Consumer<JobMessage> observer : messageObservers) { + observer.accept(message); + } + } static Boolean isTerminated(Enum state) { switch (state) { diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvoker.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvoker.java index cfd18a1..1ed74ad 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvoker.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvoker.java @@ -18,13 +18,40 @@ package org.apache.beam.runners.fnexecution.jobsubmission; import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import javax.annotation.Nullable; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Struct; +import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListeningExecutorService; +import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.MoreExecutors; +import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** Factory to create {@link JobInvocation} instances. */ +public abstract class JobInvoker { + + private final ListeningExecutorService executorService; -/** Factory to create a {@link JobInvocation} instances. */ -public interface JobInvoker { /** Start running a job, abstracting its state as a {@link JobInvocation} instance. */ - JobInvocation invoke(RunnerApi.Pipeline pipeline, Struct options, @Nullable String retrievalToken) + protected abstract JobInvocation invokeWithExecutor( + RunnerApi.Pipeline pipeline, + Struct options, + @Nullable String retrievalToken, + ListeningExecutorService executorService) throws IOException; + + JobInvocation invoke(RunnerApi.Pipeline pipeline, Struct options, @Nullable String retrievalToken) + throws IOException { + return invokeWithExecutor(pipeline, options, retrievalToken, this.executorService); + } + + private ListeningExecutorService createExecutorService(String name) { + ThreadFactory threadFactory = + new ThreadFactoryBuilder().setNameFormat(name).setDaemon(true).build(); + return MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(threadFactory)); + } + + protected JobInvoker(String name) { + this.executorService = createExecutorService(name); + } } diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvoker.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/PortablePipelineRunner.java similarity index 67% copy from runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvoker.java copy to runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/PortablePipelineRunner.java index cfd18a1..d6cf083 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvoker.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/PortablePipelineRunner.java @@ -17,14 +17,10 @@ */ package org.apache.beam.runners.fnexecution.jobsubmission; -import java.io.IOException; -import javax.annotation.Nullable; import org.apache.beam.model.pipeline.v1.RunnerApi; -import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Struct; +import org.apache.beam.sdk.PipelineResult; -/** Factory to create a {@link JobInvocation} instances. */ -public interface JobInvoker { - /** Start running a job, abstracting its state as a {@link JobInvocation} instance. */ - JobInvocation invoke(RunnerApi.Pipeline pipeline, Struct options, @Nullable String retrievalToken) - throws IOException; +/** Runs a portable Beam pipeline on some execution engine. */ +public interface PortablePipelineRunner { + PipelineResult run(RunnerApi.Pipeline pipeline) throws Exception; } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobInvocation.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobInvocation.java deleted file mode 100644 index d70e10a..0000000 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobInvocation.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.samza; - -import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.CANCELLED; -import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.DONE; -import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.FAILED; -import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.RUNNING; -import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.STARTING; -import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.STOPPED; -import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.UNRECOGNIZED; -import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.UPDATED; - -import java.util.function.Consumer; -import org.apache.beam.model.jobmanagement.v1.JobApi; -import org.apache.beam.model.pipeline.v1.RunnerApi; -import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser; -import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; -import org.apache.beam.runners.samza.util.PortablePipelineDotRenderer; -import org.apache.beam.sdk.fn.IdGenerator; -import org.apache.beam.sdk.fn.IdGenerators; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** Invocation of a Samza job via {@link SamzaRunner}. */ -public class SamzaJobInvocation implements JobInvocation { - private static final Logger LOG = LoggerFactory.getLogger(SamzaJobInvocation.class); - private static final IdGenerator idGenerator = IdGenerators.incrementingLongs(); - - private final SamzaPipelineOptions options; - private final RunnerApi.Pipeline originalPipeline; - private volatile SamzaPipelineResult pipelineResult; - private final String id; - - public SamzaJobInvocation(RunnerApi.Pipeline pipeline, SamzaPipelineOptions options, String id) { - this.originalPipeline = pipeline; - this.options = options; - this.id = id; - } - - private SamzaPipelineResult invokeSamzaJob() { - // Fused pipeline proto. - final RunnerApi.Pipeline fusedPipeline = - GreedyPipelineFuser.fuse(originalPipeline).toPipeline(); - LOG.info("Portable pipeline to run:"); - LOG.info(PortablePipelineDotRenderer.toDotString(fusedPipeline)); - // the pipeline option coming from sdk will set the sdk specific runner which will break - // serialization - // so we need to reset the runner here to a valid Java runner - options.setRunner(SamzaRunner.class); - try { - final SamzaRunner runner = new SamzaRunner(options); - return runner.runPortablePipeline(fusedPipeline); - } catch (Exception e) { - throw new RuntimeException("Failed to invoke samza job", e); - } - } - - @Override - public void start() { - LOG.info("Starting job invocation {}", getId()); - pipelineResult = invokeSamzaJob(); - } - - @Override - public String getId() { - return id; - } - - @Override - public void cancel() { - try { - if (pipelineResult != null) { - pipelineResult.cancel(); - } - } catch (Exception e) { - throw new RuntimeException("Failed to cancel job.", e); - } - } - - @Override - public JobApi.JobState.Enum getState() { - if (pipelineResult == null) { - return STARTING; - } - switch (pipelineResult.getState()) { - case RUNNING: - return RUNNING; - case FAILED: - return FAILED; - case DONE: - return DONE; - case STOPPED: - return STOPPED; - case UPDATED: - return UPDATED; - case CANCELLED: - return CANCELLED; - default: - return UNRECOGNIZED; - } - } - - @Override - public void addStateListener(Consumer<JobApi.JobState.Enum> stateStreamObserver) { - LOG.info("state listener not yet implemented. Directly use getState() instead"); - } - - @Override - public synchronized void addMessageListener(Consumer<JobApi.JobMessage> messageStreamObserver) { - LOG.info("message listener not yet implemented."); - } -} diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java index 834f066..c582d17 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java @@ -32,6 +32,7 @@ import org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService; import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker; import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Struct; +import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListeningExecutorService; import org.kohsuke.args4j.CmdLineException; import org.kohsuke.args4j.CmdLineParser; import org.kohsuke.args4j.Option; @@ -39,6 +40,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** Driver program that starts a job server. */ +// TODO extend JobServerDriver public class SamzaJobServerDriver { private static final Logger LOG = LoggerFactory.getLogger(SamzaJobServerDriver.class); @@ -78,10 +80,13 @@ public class SamzaJobServerDriver { private static InMemoryJobService createJobService(int controlPort) throws IOException { JobInvoker jobInvoker = - new JobInvoker() { + new JobInvoker("samza-job-invoker") { @Override - public JobInvocation invoke( - RunnerApi.Pipeline pipeline, Struct options, @Nullable String retrievalToken) + protected JobInvocation invokeWithExecutor( + RunnerApi.Pipeline pipeline, + Struct options, + @Nullable String retrievalToken, + ListeningExecutorService executorService) throws IOException { SamzaPipelineOptions samzaPipelineOptions = PipelineOptionsTranslation.fromProto(options).as(SamzaPipelineOptions.class); @@ -96,7 +101,8 @@ public class SamzaJobServerDriver { String invocationId = String.format( "%s_%s", samzaPipelineOptions.getJobName(), UUID.randomUUID().toString()); - return new SamzaJobInvocation(pipeline, samzaPipelineOptions, invocationId); + SamzaPipelineRunner pipelineRunner = new SamzaPipelineRunner(samzaPipelineOptions); + return new JobInvocation(invocationId, executorService, pipeline, pipelineRunner); } }; return InMemoryJobService.create( diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java index d49dece..65204c4 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java @@ -24,7 +24,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.samza.config.ConfigFactory; import org.apache.samza.config.factories.PropertiesConfigFactory; -/** Options which can be used to configure a Samza PipelineRunner. */ +/** Options which can be used to configure a Samza PortablePipelineRunner. */ public interface SamzaPipelineOptions extends PipelineOptions { @Description( diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java new file mode 100644 index 0000000..1a94e1d --- /dev/null +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.samza; + +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; +import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser; +import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineRunner; +import org.apache.beam.runners.samza.util.PortablePipelineDotRenderer; +import org.apache.beam.sdk.PipelineResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Runs a Samza job via {@link SamzaRunner}. */ +public class SamzaPipelineRunner implements PortablePipelineRunner { + + private static final Logger LOG = LoggerFactory.getLogger(SamzaPipelineRunner.class); + + private final SamzaPipelineOptions options; + + @Override + public PipelineResult run(final Pipeline pipeline) { + // Fused pipeline proto. + final RunnerApi.Pipeline fusedPipeline = GreedyPipelineFuser.fuse(pipeline).toPipeline(); + LOG.info("Portable pipeline to run:"); + LOG.info(PortablePipelineDotRenderer.toDotString(fusedPipeline)); + // the pipeline option coming from sdk will set the sdk specific runner which will break + // serialization + // so we need to reset the runner here to a valid Java runner + options.setRunner(SamzaRunner.class); + try { + final SamzaRunner runner = new SamzaRunner(options); + return runner.runPortablePipeline(fusedPipeline); + } catch (Exception e) { + throw new RuntimeException("Failed to invoke samza job", e); + } + } + + public SamzaPipelineRunner(SamzaPipelineOptions options) { + this.options = options; + } +}