Repository: samza
Updated Branches:
  refs/heads/master 0c488bbb8 -> 8ce1bd556


SAMZA-1790: LocalContainerRunner should not extend AbstractApplicationRunner.

LocalContainerRunner is the launcher for the process running SamzaContainer in 
YARN. It extends the AbstractApplicationRunner since the container was using 
ApplicationRunner#getStreamSpec to create StreamSpecs from config to create the 
High Level API DAG. It doesn't implement any of the other APIs from the 
ApplicationRunner.

With SAMZA-1659 and SAMZA-1745, SamzaContainer no longer needs access to 
StreamSpec to create and execute the High Level API DAG. We can now clean up 
the LocalContainerRunner implementation so that it doesn't need to implement 
the ApplicationRunner interface.

Author: Prateek Maheshwari <pmaheshw...@apache.org>

Reviewers: Jagadish Venkatraman <vjagadis1...@gmail.com>, Yi Pan 
<nickpa...@gmail.com>

Closes #586 from prateekm/lcr-cleanup


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8ce1bd55
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8ce1bd55
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8ce1bd55

Branch: refs/heads/master
Commit: 8ce1bd5569ba5144a39eb8890986e9db5c27d16a
Parents: 0c488bb
Author: Prateek Maheshwari <pmaheshw...@apache.org>
Authored: Thu Aug 2 06:55:54 2018 -0700
Committer: Prateek Maheshwari <pmaheshw...@apache.org>
Committed: Thu Aug 2 06:55:54 2018 -0700

----------------------------------------------------------------------
 .../samza/runtime/LocalContainerRunner.java     | 148 ++++++++-----------
 1 file changed, 60 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/8ce1bd55/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index e6e622d..fe75883 100644
--- 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -31,9 +31,9 @@ import org.apache.samza.container.ContainerHeartbeatClient;
 import org.apache.samza.container.ContainerHeartbeatMonitor;
 import org.apache.samza.container.SamzaContainer;
 import org.apache.samza.container.SamzaContainer$;
+import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.util.SamzaUncaughtExceptionHandler;
 import org.apache.samza.container.SamzaContainerListener;
-import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.task.TaskFactoryUtil;
 import org.apache.samza.util.ScalaJavaUtil;
@@ -41,38 +41,49 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * LocalContainerRunner is the local runner for Yarn {@link SamzaContainer}s. 
It is an intermediate step to
- * have a local runner for yarn before we consolidate the Yarn container and 
coordination into a
- * {@link org.apache.samza.processor.StreamProcessor}. This class will be 
replaced by the {@link org.apache.samza.processor.StreamProcessor}
- * local runner once that's done.
- *
- * Since we don't have the {@link org.apache.samza.coordinator.JobCoordinator} 
implementation in Yarn, the components (jobModel and containerId)
- * are directly inside the runner.
+ * Launches and manages the lifecycle for {@link SamzaContainer}s in YARN.
  */
-public class LocalContainerRunner extends AbstractApplicationRunner {
+public class LocalContainerRunner {
   private static final Logger log = 
LoggerFactory.getLogger(LocalContainerRunner.class);
-  private final JobModel jobModel;
-  private final String containerId;
-  private volatile Throwable containerRunnerException = null;
-  private ContainerHeartbeatMonitor containerHeartbeatMonitor;
-  private SamzaContainer container;
-
-  public LocalContainerRunner(JobModel jobModel, String containerId) {
-    super(jobModel.getConfig());
-    this.jobModel = jobModel;
-    this.containerId = containerId;
-  }
+  private static volatile Throwable containerRunnerException = null;
 
-  @Override
-  public void runTask() {
-    throw new UnsupportedOperationException("Running StreamTask is not 
implemented for LocalContainerRunner");
-  }
+  public static void main(String[] args) throws Exception {
+    Thread.setDefaultUncaughtExceptionHandler(
+        new SamzaUncaughtExceptionHandler(() -> {
+          log.info("Exiting process now.");
+          System.exit(1);
+        }));
+
+    String containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID());
+    log.info(String.format("Got container ID: %s", containerId));
+    System.out.println(String.format("Container ID: %s", containerId));
 
-  @Override
-  public void run(StreamApplication streamApp) {
-    Object taskFactory = getTaskFactory(streamApp);
+    String coordinatorUrl = 
System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
+    log.info(String.format("Got coordinator URL: %s", coordinatorUrl));
+    System.out.println(String.format("Coordinator URL: %s", coordinatorUrl));
 
-    container = SamzaContainer$.MODULE$.apply(
+    int delay = new 
Random().nextInt(SamzaContainer.DEFAULT_READ_JOBMODEL_DELAY_MS()) + 1;
+    JobModel jobModel = SamzaContainer.readJobModel(coordinatorUrl, delay);
+    Config config = jobModel.getConfig();
+    JobConfig jobConfig = new JobConfig(config);
+    if (jobConfig.getName().isEmpty()) {
+      throw new SamzaException("can not find the job name");
+    }
+    String jobName = jobConfig.getName().get();
+    String jobId = 
jobConfig.getJobId().getOrElse(ScalaJavaUtil.defaultValue("1"));
+    MDC.put("containerName", "samza-container-" + containerId);
+    MDC.put("jobName", jobName);
+    MDC.put("jobId", jobId);
+
+    StreamApplication streamApp = 
TaskFactoryUtil.createStreamApplication(config);
+    Object taskFactory = getTaskFactory(streamApp, config);
+    run(taskFactory, containerId, jobModel, config);
+
+    System.exit(0);
+  }
+
+  private static void run(Object taskFactory, String containerId, JobModel 
jobModel, Config config) {
+    SamzaContainer container = SamzaContainer$.MODULE$.apply(
         containerId,
         jobModel,
         config,
@@ -98,75 +109,43 @@ public class LocalContainerRunner extends 
AbstractApplicationRunner {
           }
         });
 
-    startContainerHeartbeatMonitor();
+    ContainerHeartbeatMonitor heartbeatMonitor = 
createContainerHeartbeatMonitor(container);
+    if (heartbeatMonitor != null) {
+      heartbeatMonitor.start();
+    }
+
     container.run();
-    stopContainerHeartbeatMonitor();
-    
+
+    if (heartbeatMonitor != null) {
+      heartbeatMonitor.stop();
+    }
+
     if (containerRunnerException != null) {
       log.error("Container stopped with Exception. Exiting process now.", 
containerRunnerException);
       System.exit(1);
     }
   }
 
-  private Object getTaskFactory(StreamApplication streamApp) {
+  private static Object getTaskFactory(StreamApplication streamApp, Config 
config) {
     if (streamApp != null) {
+      StreamGraphSpec graphSpec = new StreamGraphSpec(config);
       streamApp.init(graphSpec, config);
       return 
TaskFactoryUtil.createTaskFactory(graphSpec.getOperatorSpecGraph(), 
graphSpec.getContextManager());
     }
     return TaskFactoryUtil.createTaskFactory(config);
   }
 
-  @Override
-  public void kill(StreamApplication streamApp) {
-    // Ultimately this class probably won't end up extending 
ApplicationRunner, so this will be deleted
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public ApplicationStatus status(StreamApplication streamApp) {
-    // Ultimately this class probably won't end up extending 
ApplicationRunner, so this will be deleted
-    throw new UnsupportedOperationException();
-  }
-
-  public static void main(String[] args) throws Exception {
-    Thread.setDefaultUncaughtExceptionHandler(
-        new SamzaUncaughtExceptionHandler(() -> {
-          log.info("Exiting process now.");
-          System.exit(1);
-        }));
-
-    String containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID());
-    log.info(String.format("Got container ID: %s", containerId));
-    System.out.println(String.format("Container ID: %s", containerId));
-    String coordinatorUrl = 
System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
-    log.info(String.format("Got coordinator URL: %s", coordinatorUrl));
-    System.out.println(String.format("Coordinator URL: %s", coordinatorUrl));
-    int delay = new 
Random().nextInt(SamzaContainer.DEFAULT_READ_JOBMODEL_DELAY_MS()) + 1;
-    JobModel jobModel = SamzaContainer.readJobModel(coordinatorUrl, delay);
-    Config config = jobModel.getConfig();
-    JobConfig jobConfig = new JobConfig(config);
-    if (jobConfig.getName().isEmpty()) {
-      throw new SamzaException("can not find the job name");
-    }
-    String jobName = jobConfig.getName().get();
-    String jobId = 
jobConfig.getJobId().getOrElse(ScalaJavaUtil.defaultValue("1"));
-    MDC.put("containerName", "samza-container-" + containerId);
-    MDC.put("jobName", jobName);
-    MDC.put("jobId", jobId);
-
-    StreamApplication streamApp = 
TaskFactoryUtil.createStreamApplication(config);
-    LocalContainerRunner localContainerRunner = new 
LocalContainerRunner(jobModel, containerId);
-    localContainerRunner.run(streamApp);
-
-    System.exit(0);
-  }
-
-  private void startContainerHeartbeatMonitor() {
+  /**
+   * Creates a new container heartbeat monitor if possible.
+   * @param container the container to monitor
+   * @return a new {@link ContainerHeartbeatMonitor} instance, or null if 
could not create one
+   */
+  private static ContainerHeartbeatMonitor 
createContainerHeartbeatMonitor(SamzaContainer container) {
     String coordinatorUrl = 
System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
     String executionEnvContainerId = 
System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID());
     if (executionEnvContainerId != null) {
       log.info("Got execution environment container id: {}", 
executionEnvContainerId);
-      containerHeartbeatMonitor = new ContainerHeartbeatMonitor(() -> {
+      return new ContainerHeartbeatMonitor(() -> {
           try {
             container.shutdown();
             containerRunnerException = new SamzaException("Container shutdown 
due to expired heartbeat");
@@ -175,16 +154,9 @@ public class LocalContainerRunner extends 
AbstractApplicationRunner {
             System.exit(1);
           }
         }, new ContainerHeartbeatClient(coordinatorUrl, 
executionEnvContainerId));
-      containerHeartbeatMonitor.start();
     } else {
-      containerHeartbeatMonitor = null;
-      log.warn("executionEnvContainerId not set. Container heartbeat monitor 
will not be started");
-    }
-  }
-
-  private void stopContainerHeartbeatMonitor() {
-    if (containerHeartbeatMonitor != null) {
-      containerHeartbeatMonitor.stop();
+      log.warn("Execution environment container id not set. Container 
heartbeat monitor will not be created");
+      return null;
     }
   }
 }

Reply via email to