Repository: samza Updated Branches: refs/heads/master 3895a9070 -> 76de840c7
SAMZA-1584: Improve logging in StreamProcessor. Add the processorID in the log lines wherever necessary(since we support running multiple stream applications in a JVM) and improving logging in general in StreamProcessor. Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com> Author: Shanthoosh Venkataraman <svenk...@lm-lsnscdw5132.linkedin.biz> Reviewers: Prateek Maheshwari <pmaheshw...@apache.org> Closes #441 from shanthoosh/SAMZA-1584 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/76de840c Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/76de840c Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/76de840c Branch: refs/heads/master Commit: 76de840c734fe2b7987af22d1ba6133437c25a5e Parents: 3895a90 Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com> Authored: Thu Apr 12 14:34:45 2018 -0700 Committer: Prateek Maheshwari <pmahe...@linkedin.com> Committed: Thu Apr 12 14:34:45 2018 -0700 ---------------------------------------------------------------------- .../apache/samza/processor/StreamProcessor.java | 54 ++++++++++---------- 1 file changed, 27 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/76de840c/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java index b548200..8dacc6c 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java +++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.apache.samza.SamzaContainerStatus; import org.apache.samza.annotation.InterfaceStability; @@ -55,6 +56,7 @@ import org.slf4j.LoggerFactory; @InterfaceStability.Evolving public class StreamProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(StreamProcessor.class); + private static final String CONTAINER_THREAD_NAME_FORMAT = "Samza StreamProcessor Container Thread-%d"; private final JobCoordinator jobCoordinator; private final StreamProcessorLifecycleListener processorListener; @@ -71,7 +73,7 @@ public class StreamProcessor { // Latch used to synchronize between the JobCoordinator thread and the container thread, when the container is // stopped due to re-balancing - /* package private */volatile CountDownLatch jcContainerShutdownLatch; + volatile CountDownLatch jcContainerShutdownLatch; private volatile boolean processorOnStartCalled = false; @VisibleForTesting @@ -179,11 +181,12 @@ public class StreamProcessor { boolean containerShutdownInvoked = false; if (container != null) { try { - LOGGER.info("Shutting down container " + container.toString() + " from StreamProcessor"); + LOGGER.info("Shutting down the container: {} of stream processor: {}.", container, processorId); container.shutdown(); + LOGGER.info("Waiting {} milliseconds for the container: {} to shutdown.", taskShutdownMs, container); containerShutdownInvoked = true; - } catch (IllegalContainerStateException icse) { - LOGGER.info("Container was not running", icse); + } catch (Exception exception) { + LOGGER.error(String.format("Ignoring the exception during the shutdown of container: %s.", container), exception); } } @@ -191,7 +194,6 @@ public class StreamProcessor { LOGGER.info("Shutting down JobCoordinator from StreamProcessor"); jobCoordinator.stop(); } - } SamzaContainer createSamzaContainer(String processorId, JobModel jobModel) { @@ -199,7 +201,7 @@ public class StreamProcessor { processorId, jobModel, config, - Util.<String, MetricsReporter>javaMapAsScalaMap(customMetricsReporter), + Util.javaMapAsScalaMap(customMetricsReporter), taskFactory); } @@ -213,32 +215,30 @@ public class StreamProcessor { if (SamzaContainerStatus.NOT_STARTED.equals(status) || SamzaContainerStatus.STARTED.equals(status)) { boolean shutdownComplete = false; try { - LOGGER.info("Shutting down container in onJobModelExpired for processor:" + processorId); + LOGGER.info("Job model expired. Shutting down the container: {} of stream processor: {}.", container, processorId); container.pause(); shutdownComplete = jcContainerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS); - LOGGER.info("ShutdownComplete=" + shutdownComplete); + LOGGER.info(String.format("Shutdown status of container: %s for stream processor: %s is: %s.", container, processorId, shutdownComplete)); } catch (IllegalContainerStateException icse) { // Ignored since container is not running - LOGGER.info("Container was not running.", icse); + LOGGER.info(String.format("Cannot shutdown container: %s for stream processor: %s. Container is not running.", container, processorId), icse); shutdownComplete = true; } catch (InterruptedException e) { Thread.currentThread().interrupt(); - LOGGER.warn("Container shutdown was interrupted!" + container.toString(), e); + LOGGER.warn(String.format("Shutdown of container: %s for stream processor: %s was interrupted", container, processorId), e); } - LOGGER.info("Shutting down container done for pid=" + processorId + "; complete =" + shutdownComplete); if (!shutdownComplete) { - LOGGER.warn("Container " + container.toString() + " may not have shutdown successfully. " + - "Stopping the processor."); + LOGGER.warn("Container: {} shutdown was unsuccessful. Stopping the stream processor: {}.", container, processorId); container = null; stop(); } else { - LOGGER.debug("Container " + container.toString() + " shutdown successfully"); + LOGGER.info("Container: {} shutdown completed for stream processor: {}.", container, processorId); } } else { - LOGGER.debug("Container " + container.toString() + " is not running."); + LOGGER.info("Container: {} of the stream processor: {} is not running.", container, processorId); } } else { - LOGGER.debug("Container is not instantiated yet."); + LOGGER.info("Container is not instantiated for stream processor: {}.", processorId); } } @@ -257,19 +257,19 @@ public class StreamProcessor { processorListener.onStart(); } } else { - LOGGER.debug("StreamProcessorListener was notified of container start previously. Hence, skipping this time."); + LOGGER.warn("Received duplicate container start notification for container: {} in stream processor: {}.", container, processorId); } } @Override public void onContainerStop(boolean pauseByJm) { if (pauseByJm) { - LOGGER.info("Container " + container.toString() + " stopped due to a request from JobCoordinator."); + LOGGER.info("Container: {} of the stream processor: {} was stopped by the JobCoordinator.", container, processorId); if (jcContainerShutdownLatch != null) { jcContainerShutdownLatch.countDown(); } } else { // sp.stop was called or container stopped by itself - LOGGER.info("Container " + container.toString() + " stopped."); + LOGGER.info("Container: {} stopped. Stopping the stream processor: {}.", container, processorId); container = null; // this guarantees that stop() doesn't try to stop container again stop(); } @@ -283,7 +283,7 @@ public class StreamProcessor { LOGGER.warn("JobCoordinatorLatch was null. It is possible for some component to be waiting."); } containerException = t; - LOGGER.error("Container failed. Stopping the processor.", containerException); + LOGGER.error(String.format("Container: %s failed with an exception. Stopping the stream processor: %s. Original exception:", container, processorId), containerException); container = null; stop(); } @@ -291,16 +291,16 @@ public class StreamProcessor { container = createSamzaContainer(processorId, jobModel); container.setContainerListener(containerListener); - LOGGER.info("Starting container " + container.toString()); - executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() - .setNameFormat("p-" + processorId + "-container-thread-%d").build()); + LOGGER.info("Starting the container: {} for the stream processor: {}.", container, processorId); + ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(CONTAINER_THREAD_NAME_FORMAT).build(); + executorService = Executors.newSingleThreadExecutor(threadFactory); executorService.submit(container::run); } @Override public void onCoordinatorStop() { if (executorService != null) { - LOGGER.info("Shutting down the executor service."); + LOGGER.info("Shutting down the executor service of the stream processor: {}.", processorId); executorService.shutdownNow(); } if (processorListener != null) { @@ -312,11 +312,11 @@ public class StreamProcessor { } @Override - public void onCoordinatorFailure(Throwable e) { - LOGGER.info("Coordinator Failed. Stopping the processor."); + public void onCoordinatorFailure(Throwable throwable) { + LOGGER.info(String.format("Coordinator: %s failed with an exception. Stopping the stream processor: %s. Original exception:", jobCoordinator, processorId), throwable); stop(); if (processorListener != null) { - processorListener.onFailure(e); + processorListener.onFailure(throwable); } } };