This is an automated email from the ASF dual-hosted git repository. robertwb pushed a commit to branch release-2.4.0 in repository https://gitbox.apache.org/repos/asf/beam.git
commit 1b2ba1e2b1a183010ee1ce4d317592ea0386b41b Author: Thomas Groh <tg...@google.com> AuthorDate: Fri Mar 2 09:27:48 2018 -0800 Revert "extracting the scheduled executor service in a factory variable in SDF direct runner factory" This reverts commit 0044cbf385a4991a7d5191a91b881d8525d747c0. Breaks FlinkRunner Compilation --- .../org/apache/beam/runners/core/DoFnRunners.java | 2 +- .../apache/beam/runners/core/ProcessFnRunner.java | 10 +- .../apache/beam/runners/core/SimpleDoFnRunner.java | 4 - ...LifecycleManagerRemovingTransformEvaluator.java | 4 - .../direct/ExecutorServiceParallelExecutor.java | 40 ++------ .../apache/beam/runners/direct/ParDoEvaluator.java | 8 -- .../beam/runners/direct/ParDoEvaluatorFactory.java | 2 +- .../SplittableProcessElementsEvaluatorFactory.java | 109 +++++++++++---------- .../beam/runners/direct/DirectRunnerTest.java | 35 ------- 9 files changed, 67 insertions(+), 147 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java index 41116f1..80c830a 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java @@ -52,7 +52,7 @@ public class DoFnRunners { * compressed {@link WindowedValue}. It is the responsibility of the runner to perform any key * partitioning needed, etc. */ - public static <InputT, OutputT> SimpleDoFnRunner<InputT, OutputT> simpleRunner( + public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner( PipelineOptions options, DoFn<InputT, OutputT> fn, SideInputReader sideInputReader, diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java index 6690f58..e4dfd13 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java @@ -25,7 +25,6 @@ import java.util.Collections; import org.apache.beam.runners.core.StateNamespaces.WindowNamespace; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.sdk.state.TimeDomain; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.WindowedValue; @@ -40,13 +39,12 @@ import org.joda.time.Instant; public class ProcessFnRunner<InputT, OutputT, RestrictionT> implements PushbackSideInputDoFnRunner< KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> { - private final SimpleDoFnRunner<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> - underlying; + private final DoFnRunner<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> underlying; private final Collection<PCollectionView<?>> views; private final ReadyCheckingSideInputReader sideInputReader; public ProcessFnRunner( - SimpleDoFnRunner<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> underlying, + DoFnRunner<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> underlying, Collection<PCollectionView<?>> views, ReadyCheckingSideInputReader sideInputReader) { this.underlying = underlying; @@ -54,10 +52,6 @@ public class ProcessFnRunner<InputT, OutputT, RestrictionT> this.sideInputReader = sideInputReader; } - public DoFn<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> getFn() { - return underlying.getFn(); - } - @Override public void startBundle() { underlying.startBundle(); diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 36b42ef..d4c5775 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -120,10 +120,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out this.allowedLateness = windowingStrategy.getAllowedLateness(); } - public DoFn<InputT, OutputT> getFn() { - return fn; - } - @Override public void startBundle() { // This can contain user code. Wrap it in case it throws an exception. diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java index eed81b3..e537962 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java @@ -45,10 +45,6 @@ class DoFnLifecycleManagerRemovingTransformEvaluator<InputT> implements Transfor this.lifecycleManager = lifecycleManager; } - public ParDoEvaluator<InputT> getParDoEvaluator() { - return underlying; - } - @Override public void processElement(WindowedValue<InputT> element) throws Exception { try { diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index 4a1afc6..652f388 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -25,7 +25,6 @@ import com.google.common.cache.RemovalListener; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import java.util.ArrayList; import java.util.Collection; import java.util.Map; import java.util.concurrent.BlockingQueue; @@ -35,7 +34,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.beam.runners.local.ExecutionDriver; import org.apache.beam.runners.local.ExecutionDriver.DriverState; @@ -274,41 +272,17 @@ final class ExecutorServiceParallelExecutor return; } LOG.debug("Pipeline has terminated. Shutting down."); - - final Collection<Exception> errors = new ArrayList<>(); + pipelineState.compareAndSet(State.RUNNING, newState); // Stop accepting new work before shutting down the executor. This ensures that thread don't try // to add work to the shutdown executor. - try { - serialExecutorServices.invalidateAll(); - } catch (final RuntimeException re) { - errors.add(re); - } - try { - serialExecutorServices.cleanUp(); - } catch (final RuntimeException re) { - errors.add(re); - } - try { - parallelExecutorService.shutdown(); - } catch (final RuntimeException re) { - errors.add(re); - } - try { - executorService.shutdown(); - } catch (final RuntimeException re) { - errors.add(re); - } + serialExecutorServices.invalidateAll(); + serialExecutorServices.cleanUp(); + parallelExecutorService.shutdown(); + executorService.shutdown(); try { registry.cleanup(); - } catch (final Exception e) { - errors.add(e); - } - pipelineState.compareAndSet(State.RUNNING, newState); // ensure we hit a terminal node - if (!errors.isEmpty()) { - throw new IllegalStateException( - "Error" + (errors.size() == 1 ? "" : "s") + " during executor shutdown:\n" - + errors.stream().map(Exception::getMessage) - .collect(Collectors.joining("\n- ", "- ", ""))); + } catch (Exception e) { + visibleUpdates.failed(e); } } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java index c2b877f..7694b94 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java @@ -169,14 +169,6 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> { } } - public PushbackSideInputDoFnRunner<InputT, ?> getFnRunner() { - return fnRunner; - } - - public DirectStepContext getStepContext() { - return stepContext; - } - public BundleOutputManager getOutputManager() { return outputManager; } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java index 2963118..5774f17 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java @@ -42,7 +42,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator private static final Logger LOG = LoggerFactory.getLogger(ParDoEvaluatorFactory.class); private final LoadingCache<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager> fnClones; - final EvaluationContext evaluationContext; + private final EvaluationContext evaluationContext; private final ParDoEvaluator.DoFnRunnerFactory<InputT, OutputT> runnerFactory; ParDoEvaluatorFactory( diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java index cd38c8c..f4c4895 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java @@ -17,22 +17,18 @@ */ package org.apache.beam.runners.direct; -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.common.cache.CacheLoader; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.Collection; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker; import org.apache.beam.runners.core.OutputWindowedValue; -import org.apache.beam.runners.core.ProcessFnRunner; import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessElements; import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessFn; +import org.apache.beam.runners.core.StateInternals; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -54,30 +50,16 @@ class SplittableProcessElementsEvaluatorFactory< implements TransformEvaluatorFactory { private final ParDoEvaluatorFactory<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> delegateFactory; - private final ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder() - .setThreadFactory(MoreExecutors.platformThreadFactory()) - .setNameFormat("direct-splittable-process-element-checkpoint-executor") - .build()); + private final EvaluationContext evaluationContext; SplittableProcessElementsEvaluatorFactory(EvaluationContext evaluationContext) { + this.evaluationContext = evaluationContext; this.delegateFactory = - new ParDoEvaluatorFactory<>( - evaluationContext, - SplittableProcessElementsEvaluatorFactory. - <InputT, OutputT, RestrictionT>processFnRunnerFactory(), - new CacheLoader<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager>() { - @Override - public DoFnLifecycleManager load(final AppliedPTransform<?, ?, ?> application) { - checkArgument( - ProcessElements.class.isInstance(application.getTransform()), - "No know extraction of the fn from " + application); - final ProcessElements<InputT, OutputT, RestrictionT, TrackerT> transform = - (ProcessElements<InputT, OutputT, RestrictionT, TrackerT>) - application.getTransform(); - return DoFnLifecycleManager.of(transform.newProcessFn(transform.getFn())); - } - }); + new ParDoEvaluatorFactory<>( + evaluationContext, + SplittableProcessElementsEvaluatorFactory + .<InputT, OutputT, RestrictionT>processFnRunnerFactory(), + ParDoEvaluatorFactory.basicDoFnCacheLoader()); } @Override @@ -86,14 +68,12 @@ class SplittableProcessElementsEvaluatorFactory< @SuppressWarnings({"unchecked", "rawtypes"}) TransformEvaluator<T> evaluator = (TransformEvaluator<T>) - createEvaluator((AppliedPTransform) application, - (CommittedBundle) inputBundle); + createEvaluator((AppliedPTransform) application, (CommittedBundle) inputBundle); return evaluator; } @Override public void cleanup() throws Exception { - ses.shutdownNow(); // stop before cleaning delegateFactory.cleanup(); } @@ -106,29 +86,43 @@ class SplittableProcessElementsEvaluatorFactory< CommittedBundle<InputT> inputBundle) throws Exception { final ProcessElements<InputT, OutputT, RestrictionT, TrackerT> transform = - application.getTransform(); - final DoFnLifecycleManagerRemovingTransformEvaluator - <KeyedWorkItem<String, KV<InputT, RestrictionT>>> evaluator = - delegateFactory.createEvaluator( - (AppliedPTransform) application, - (PCollection<KeyedWorkItem<String, KV<InputT, RestrictionT>>>) inputBundle.getPCollection(), - inputBundle.getKey(), - application.getTransform().getSideInputs(), - application.getTransform().getMainOutputTag(), - application.getTransform().getAdditionalOutputTags().getAll()); - - final ParDoEvaluator<KeyedWorkItem<String, KV<InputT, RestrictionT>>> pde = - evaluator.getParDoEvaluator(); - final ProcessFn<InputT, OutputT, RestrictionT, TrackerT> processFn = - (ProcessFn<InputT, OutputT, RestrictionT, TrackerT>) - ProcessFnRunner.class.cast(pde.getFnRunner()).getFn(); - final DirectExecutionContext.DirectStepContext stepContext = pde.getStepContext(); - processFn.setStateInternalsFactory(key -> stepContext.stateInternals()); + application.getTransform(); + + ProcessFn<InputT, OutputT, RestrictionT, TrackerT> processFn = + transform.newProcessFn(transform.getFn()); + + DoFnLifecycleManager fnManager = DoFnLifecycleManager.of(processFn); + processFn = + ((ProcessFn<InputT, OutputT, RestrictionT, TrackerT>) + fnManager.<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT>get()); + + String stepName = evaluationContext.getStepName(application); + final DirectExecutionContext.DirectStepContext stepContext = + evaluationContext + .getExecutionContext(application, inputBundle.getKey()) + .getStepContext(stepName); + + final ParDoEvaluator<KeyedWorkItem<String, KV<InputT, RestrictionT>>> + parDoEvaluator = + delegateFactory.createParDoEvaluator( + application, + inputBundle.getKey(), + (PCollection<KeyedWorkItem<String, KV<InputT, RestrictionT>>>) + inputBundle.getPCollection(), + transform.getSideInputs(), + transform.getMainOutputTag(), + transform.getAdditionalOutputTags().getAll(), + stepContext, + processFn, + fnManager); + + processFn.setStateInternalsFactory(key -> (StateInternals) stepContext.stateInternals()); + processFn.setTimerInternalsFactory(key -> stepContext.timerInternals()); OutputWindowedValue<OutputT> outputWindowedValue = new OutputWindowedValue<OutputT>() { - private final OutputManager outputManager = pde.getOutputManager(); + private final OutputManager outputManager = parDoEvaluator.getOutputManager(); @Override public void outputWindowedValue( @@ -150,18 +144,27 @@ class SplittableProcessElementsEvaluatorFactory< outputManager.output(tag, WindowedValue.of(output, timestamp, windows, pane)); } }; - processFn.setProcessElementInvoker( + processFn.setProcessElementInvoker( new OutputAndTimeBoundedSplittableProcessElementInvoker<>( transform.getFn(), - delegateFactory.evaluationContext.getPipelineOptions(), + evaluationContext.getPipelineOptions(), outputWindowedValue, - delegateFactory.evaluationContext.createSideInputReader(transform.getSideInputs()), - ses, + evaluationContext.createSideInputReader(transform.getSideInputs()), + // TODO: For better performance, use a higher-level executor? + // TODO: (BEAM-723) Create a shared ExecutorService for maintenance tasks in the + // DirectRunner. + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setThreadFactory(MoreExecutors.platformThreadFactory()) + .setDaemon(true) + .setNameFormat("direct-splittable-process-element-checkpoint-executor") + .build()), // Setting small values here to stimulate frequent checkpointing and better exercise // splittable DoFn's in that respect. 100, Duration.standardSeconds(1))); - return evaluator; + + return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(parDoEvaluator, fnManager); } private static <InputT, OutputT, RestrictionT> diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java index 1c8b144..830d0c1 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java @@ -19,7 +19,6 @@ package org.apache.beam.runners.direct; import static com.google.common.base.Preconditions.checkState; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.isA; import static org.junit.Assert.assertThat; @@ -40,7 +39,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; @@ -301,39 +299,6 @@ public class DirectRunnerTest implements Serializable { assertThat(result.getState(), is(State.RUNNING)); } - private static final AtomicLong TEARDOWN_CALL = new AtomicLong(-1); - - @Test - public void tearsDownFnsBeforeFinishing() { - TEARDOWN_CALL.set(-1); - final Pipeline pipeline = getPipeline(); - pipeline.apply(Create.of("a")) - .apply(ParDo.of(new DoFn<String, String>() { - @ProcessElement - public void onElement(final ProcessContext ctx) { - // no-op - } - - @Teardown - public void teardown() { - // just to not have a fast execution hiding an issue until we have a shutdown callback - try { - Thread.sleep(1000); - } catch (final InterruptedException e) { - fail(); - } - TEARDOWN_CALL.set(System.nanoTime()); - } - })); - final PipelineResult pipelineResult = pipeline.run(); - pipelineResult.waitUntilFinish(); - - final long doneTs = System.nanoTime(); - final long tearDownTs = TEARDOWN_CALL.get(); - assertThat(tearDownTs, greaterThan(0L)); - assertThat(doneTs, greaterThan(tearDownTs)); - } - @Test public void transformDisplayDataExceptionShouldFail() { DoFn<Integer, Integer> brokenDoFn = new DoFn<Integer, Integer>() { -- To stop receiving notification emails like this one, please contact rober...@apache.org.