This is an automated email from the ASF dual-hosted git repository. xinyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 540327e [BEAM-7059] SamzaRunner: fix the job.id inconsistency in the new Samza version (#8279) 540327e is described below commit 540327eab201ede710681cd07de8f8105a506730 Author: xinyuiscool <xinyuliu...@gmail.com> AuthorDate: Thu Apr 11 18:29:32 2019 -0700 [BEAM-7059] SamzaRunner: fix the job.id inconsistency in the new Samza version (#8279) --- .../beam/runners/samza/SamzaExecutionContext.java | 33 ++++++++++++++++------ .../org/apache/beam/runners/samza/SamzaRunner.java | 5 ++-- .../samza/runtime/SamzaTimerInternalsFactory.java | 10 ++++++- .../runners/samza/translation/ConfigBuilder.java | 2 ++ 4 files changed, 38 insertions(+), 12 deletions(-) diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaExecutionContext.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaExecutionContext.java index af65135..0867e51 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaExecutionContext.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaExecutionContext.java @@ -59,6 +59,7 @@ public class SamzaExecutionContext implements ApplicationContainerContext { private GrpcFnServer<GrpcDataService> fnDataServer; private GrpcFnServer<GrpcStateService> fnStateServer; private ControlClientPool controlClientPool; + private ExecutorService dataExecutor; private IdGenerator idGenerator = IdGenerators.incrementingLongs(); public SamzaExecutionContext(SamzaPipelineOptions options) { @@ -92,7 +93,7 @@ public class SamzaExecutionContext implements ApplicationContainerContext { if (SamzaRunnerOverrideConfigs.isPortableMode(options)) { try { controlClientPool = MapControlClientPool.create(); - final ExecutorService dataExecutor = Executors.newCachedThreadPool(); + dataExecutor = Executors.newCachedThreadPool(); fnControlServer = GrpcFnServer.allocatePortAndCreateFor( @@ -100,18 +101,23 @@ public class SamzaExecutionContext implements ApplicationContainerContext { controlClientPool.getSink(), () -> SAMZA_WORKER_ID), ServerFactory.createWithPortSupplier( () -> SamzaRunnerOverrideConfigs.getFnControlPort(options))); + LOG.info("Started control server on port {}", fnControlServer.getServer().getPort()); fnDataServer = GrpcFnServer.allocatePortAndCreateFor( GrpcDataService.create(dataExecutor, OutboundObserverFactory.serverDirect()), ServerFactory.createDefault()); + LOG.info("Started data server on port {}", fnDataServer.getServer().getPort()); fnStateServer = GrpcFnServer.allocatePortAndCreateFor( GrpcStateService.create(), ServerFactory.createDefault()); + LOG.info("Started state server on port {}", fnStateServer.getServer().getPort()); final long waitTimeoutMs = SamzaRunnerOverrideConfigs.getControlClientWaitTimeoutMs(options); + LOG.info("Control client wait timeout config: " + waitTimeoutMs); + final InstructionRequestHandler instructionHandler = controlClientPool.getSource().take(SAMZA_WORKER_ID, Duration.ofMillis(waitTimeoutMs)); final EnvironmentFactory environmentFactory = @@ -120,6 +126,7 @@ public class SamzaExecutionContext implements ApplicationContainerContext { jobBundleFactory = SingleEnvironmentInstanceJobBundleFactory.create( environmentFactory, fnDataServer, fnStateServer, idGenerator); + LOG.info("Started job bundle factory"); } catch (Exception e) { throw new RuntimeException( "Running samza in Beam portable mode but failed to create job bundle factory", e); @@ -131,19 +138,29 @@ public class SamzaExecutionContext implements ApplicationContainerContext { @Override public void stop() { - closeFnServer(fnControlServer); + closeAutoClosable(fnControlServer, "controlServer"); fnControlServer = null; - closeFnServer(fnDataServer); + closeAutoClosable(fnDataServer, "dataServer"); fnDataServer = null; - closeFnServer(fnStateServer); + closeAutoClosable(fnStateServer, "stateServer"); fnStateServer = null; + if (dataExecutor != null) { + dataExecutor.shutdown(); + dataExecutor = null; + } + controlClientPool = null; + closeAutoClosable(jobBundleFactory, "jobBundle"); + jobBundleFactory = null; } - private void closeFnServer(GrpcFnServer<?> fnServer) { - try (AutoCloseable closer = fnServer) { - // do nothing + private static void closeAutoClosable(AutoCloseable closeable, String name) { + try (AutoCloseable closer = closeable) { + LOG.info("Closed {}", name); } catch (Exception e) { - LOG.error("Failed to close fn api servers. Ignore since this is shutdown process...", e); + LOG.error( + "Failed to close {}. Ignore since this is shutdown process...", + closeable.getClass().getSimpleName(), + e); } } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java index 36d47a8..3a9e442 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java @@ -102,9 +102,7 @@ public class SamzaRunner extends PipelineRunner<SamzaPipelineResult> { pipeline.replaceAll(SamzaTransformOverrides.getDefaultOverrides()); - if (LOG.isDebugEnabled()) { - LOG.debug("Post-processed Beam pipeline:\n{}", PipelineDotRenderer.toDotString(pipeline)); - } + LOG.info("Beam pipeline DOT graph:\n{}", PipelineDotRenderer.toDotString(pipeline)); final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline); @@ -141,6 +139,7 @@ public class SamzaRunner extends PipelineRunner<SamzaPipelineResult> { final MetricsReporter reporter = options.getMetricsReporters().get(i); reporters.put(name, (MetricsReporterFactory) (nm, processorId, config) -> reporter); + LOG.info(name + ": " + reporter.getClass().getName()); } return reporters; } else { diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java index 146a9a8..4394675 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java @@ -344,13 +344,17 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> { private void loadEventTimeTimers() { if (!eventTimerTimerState.isEmpty().read()) { final Iterator<KeyedTimerData<K>> iter = eventTimerTimerState.readIterator().read(); - for (int i = 0; i < timerBufferSize && iter.hasNext(); i++) { + int i = 0; + for (; i < timerBufferSize && iter.hasNext(); i++) { eventTimeTimers.add(iter.next()); } + LOG.info("Loaded {} event time timers in memory", i); + // manually close the iterator here final SamzaStoreStateInternals.KeyValueIteratorState iteratorState = (SamzaStoreStateInternals.KeyValueIteratorState) eventTimerTimerState; + iteratorState.closeIterators(); } } @@ -359,11 +363,15 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> { if (!processingTimerTimerState.isEmpty().read()) { final Iterator<KeyedTimerData<K>> iter = processingTimerTimerState.readIterator().read(); // since the iterator will reach to the end, it will be closed automatically + int count = 0; while (iter.hasNext()) { final KeyedTimerData<K> keyedTimerData = iter.next(); timerRegistry.schedule( keyedTimerData, keyedTimerData.getTimerData().getTimestamp().getMillis()); + ++count; } + + LOG.info("Loaded {} processing time timers in memory", count); } } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java index 42a0d97..975baa2 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java @@ -82,6 +82,8 @@ public class ConfigBuilder { // apply user configs config.putAll(createUserConfig(options)); + config.put(ApplicationConfig.APP_NAME, options.getJobName()); + config.put(ApplicationConfig.APP_ID, options.getJobInstance()); config.put(JobConfig.JOB_NAME(), options.getJobName()); config.put(JobConfig.JOB_ID(), options.getJobInstance());