Repository: samza
Updated Branches:
  refs/heads/master bd9387b7f -> df56f2dc1


SAMZA-1835: Consolidate all processorId generation code.

Currently, the processorId creation function createProcessorId() is repeated in 
three different implementation of `JobCoordinator` viz `ZkJobCoordinator`, 
`PassthroughJobCoordinator`, and `AzureJobCoordinator`.  Here're the few 
problems that stems from this duplication.

1. `ProcessorId` is passed into the `MetricsReporterFactory` through the 
factory create method: `MetricsReporter getMetricsReporter(String name, String 
processorId, Config config);`. Custom `MetricsReporter` implementations 
currently use the processorId as a component in the generated metric names. 
Metrics reporters are instantiated from `LocalApplicationRunner` 
and`processorId` is currently passed in as null to 
`MetricsReporterFactory.getMetricsReporter`. This corrupts the generated 
metrics names.
2. `ZkJobCoordinator`, `ZkUtils`,  `ZkLeaderElector` and different downstream 
components of `LocalApplicationRunner` currently instantiate and manage their 
private reporters, rather than the sharing common `MetricsRegistry` managed by 
`LocalApplicationRunner`. Since there is no common namespace and reporter 
shared by downstream components of `LocalApplicationRunner`,  generating 
metrics dashboards for standalone is kind of a hassle.

This PR is comprised of the following changes:

1. Moved the processorId generation to `LocalApplicationRunner` and injects the 
generated `processorId` to all the downstream layers.
2. Deprecated the getProcessorId API in `JobCoordinator` interface.
3. Add the `processorId` and `metricsRegistry` arguments to the 
`getJobCoordinator` method of `JobCoordinatorFactory` interface.
4. Fixed the unit tests and added unit tests for 
`LocalApplicationRunner.createProcessorId`.

Author: Shanthoosh Venkataraman <svenk...@linkedin.com>
Author: Shanthoosh Venkataraman <spven...@usc.edu>
Author: svenkata <svenkatara...@linkedin.com>

Reviewers: Jagadish<jagad...@apache.org>

Closes #844 from shanthoosh/SAMZA-1835


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/df56f2dc
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/df56f2dc
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/df56f2dc

Branch: refs/heads/master
Commit: df56f2dc1ee1be9b2a3aa9894465e3f870451125
Parents: bd9387b
Author: Shanthoosh Venkataraman <svenk...@linkedin.com>
Authored: Thu Dec 6 18:11:38 2018 -0800
Committer: Jagadish <jvenkatra...@linkedin.com>
Committed: Thu Dec 6 18:11:38 2018 -0800

----------------------------------------------------------------------
 .../samza/coordinator/AzureJobCoordinator.java  | 25 ++---------
 .../coordinator/AzureJobCoordinatorFactory.java |  5 ++-
 .../apache/samza/config/ApplicationConfig.java  |  1 -
 .../samza/coordinator/JobCoordinator.java       |  1 +
 .../coordinator/JobCoordinatorFactory.java      | 11 +++--
 .../apache/samza/processor/StreamProcessor.java | 45 ++++++++++++-------
 .../samza/runtime/LocalApplicationRunner.java   | 32 +++++++++++--
 .../standalone/PassthroughJobCoordinator.java   | 25 ++---------
 .../PassthroughJobCoordinatorFactory.java       |  5 ++-
 .../org/apache/samza/zk/ZkJobCoordinator.java   | 47 +-------------------
 .../samza/zk/ZkJobCoordinatorFactory.java       |  6 +--
 .../samza/processor/TestStreamProcessor.java    | 14 +++---
 .../samza/runtime/MockProcessorIdGenerator.java | 29 ++++++++++++
 .../runtime/TestLocalApplicationRunner.java     | 26 +++++++++++
 .../apache/samza/zk/TestZkJobCoordinator.java   | 10 ++---
 .../processor/TestZkStreamProcessorBase.java    |  5 ++-
 .../test/processor/TestStreamProcessor.java     |  9 ++--
 17 files changed, 158 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/df56f2dc/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
 
b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
index 076ab54..ae4aba3 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
@@ -19,12 +19,9 @@
 
 package org.apache.samza.coordinator;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.AzureClient;
-import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.AzureConfig;
 import org.apache.samza.config.Config;
-import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.container.TaskName;
@@ -42,7 +39,7 @@ import 
org.apache.samza.coordinator.scheduler.LivenessCheckScheduler;
 import org.apache.samza.coordinator.scheduler.RenewLeaseScheduler;
 import org.apache.samza.coordinator.scheduler.SchedulerStateChangeListener;
 import org.apache.samza.job.model.JobModel;
-import org.apache.samza.runtime.ProcessorIdGenerator;
+import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemStream;
@@ -101,10 +98,10 @@ public class AzureJobCoordinator implements JobCoordinator 
{
    * Creates an instance of Azure job coordinator, along with references to 
Azure leader elector, Azure Blob and Azure Table.
    * @param config User defined config
    */
-  public AzureJobCoordinator(Config config) {
+  public AzureJobCoordinator(String processorId, Config config, 
MetricsRegistry metricsRegistry) {
     //TODO: Cleanup previous values in the table when barrier times out.
+    this.processorId = processorId;
     this.config = config;
-    processorId = createProcessorId(config);
     currentJMVersion = new AtomicReference<>(INITIAL_STATE);
     AzureConfig azureConfig = new AzureConfig(config);
     AzureClient client = new 
AzureClient(azureConfig.getAzureConnectionString());
@@ -473,22 +470,6 @@ public class AzureJobCoordinator implements JobCoordinator 
{
     }
   }
 
-  private String createProcessorId(Config config) {
-    // TODO: This check to be removed after 0.13+
-    ApplicationConfig appConfig = new ApplicationConfig(config);
-    if (appConfig.getProcessorId() != null) {
-      return appConfig.getProcessorId();
-    } else if 
(StringUtils.isNotBlank(appConfig.getAppProcessorIdGeneratorClass())) {
-      ProcessorIdGenerator idGenerator =
-          Util.getObj(appConfig.getAppProcessorIdGeneratorClass(), 
ProcessorIdGenerator.class);
-      return idGenerator.generateProcessorId(config);
-    } else {
-      throw new ConfigException(String
-          .format("Expected either %s or %s to be configured", 
ApplicationConfig.PROCESSOR_ID,
-              ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS));
-    }
-  }
-
   public class AzureLeaderElectorListener implements LeaderElectorListener {
     /**
      * Keep renewing the lease and do the required tasks as a leader.

http://git-wip-us.apache.org/repos/asf/samza/blob/df56f2dc/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinatorFactory.java
 
b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinatorFactory.java
index 8b3d357..ff8925a 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinatorFactory.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinatorFactory.java
@@ -20,10 +20,11 @@
 package org.apache.samza.coordinator;
 
 import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
 
 public class AzureJobCoordinatorFactory implements JobCoordinatorFactory {
   @Override
-  public JobCoordinator getJobCoordinator(Config config) {
-    return new AzureJobCoordinator(config);
+  public JobCoordinator getJobCoordinator(String processorId, Config config, 
MetricsRegistry metricsRegistry) {
+    return new AzureJobCoordinator(processorId, config, metricsRegistry);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/df56f2dc/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
index 39facb6..804035e 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
@@ -85,7 +85,6 @@ public class ApplicationConfig extends MapConfig {
     return String.format("app-%s-%s", getAppName(), getAppId());
   }
 
-  @Deprecated
   public String getProcessorId() {
     return get(PROCESSOR_ID, null);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/df56f2dc/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java 
b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
index bd06039..cd10acb 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
@@ -70,6 +70,7 @@ public interface JobCoordinator {
    *
    * @return String representing a unique logical processor ID
    */
+  @Deprecated
   String getProcessorId();
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/df56f2dc/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
index 83ebf52..8f3d96e 100644
--- 
a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
@@ -20,13 +20,16 @@ package org.apache.samza.coordinator;
 
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
 
 @InterfaceStability.Evolving
 public interface JobCoordinatorFactory {
   /**
-   * Return a new instance of {@link JobCoordinator}
-   * @param config Configs relevant for the JobCoordinator TODO: Separate JC 
related configs into a "JobCoordinatorConfig"
-   * @return {@link JobCoordinator} instance
+   * Returns a new instance of {@link JobCoordinator}.
+   * @param processorId a unique logical identifier assigned to the {@link 
org.apache.samza.processor.StreamProcessor}.
+   * @param config the configuration of the samza application.
+   * @param metricsRegistry  used to publish the coordination specific metrics.
+   * @return the {@link JobCoordinator} instance.
    */
-  JobCoordinator getJobCoordinator(Config config);
+  JobCoordinator getJobCoordinator(String processorId, Config config, 
MetricsRegistry metricsRegistry);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/df56f2dc/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java 
b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 389dafd..2c0c0b7 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -30,6 +30,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobCoordinatorConfig;
@@ -46,6 +47,7 @@ import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorFactory;
 import org.apache.samza.coordinator.JobCoordinatorListener;
 import org.apache.samza.job.model.JobModel;
+import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.runtime.ProcessorLifecycleListener;
 import org.apache.samza.task.TaskFactory;
@@ -123,6 +125,7 @@ public class StreamProcessor {
   private final String processorId;
   private final ExecutorService containerExcecutorService;
   private final Object lock = new Object();
+  private final MetricsRegistryMap metricsRegistry;
 
   private volatile Throwable containerException = null;
 
@@ -163,25 +166,26 @@ public class StreamProcessor {
   JobCoordinatorListener jobCoordinatorListener = null;
 
   /**
-   * Same as {@link #StreamProcessor(Config, Map, TaskFactory, 
ProcessorLifecycleListener, JobCoordinator)}, except
+   * Same as {@link #StreamProcessor(String, Config, Map, TaskFactory, 
ProcessorLifecycleListener, JobCoordinator)}, except
    * it creates a {@link JobCoordinator} instead of accepting it as an 
argument.
    *
-   * @param config configuration required to launch {@link JobCoordinator} and 
{@link SamzaContainer}
-   * @param customMetricsReporters registered with the metrics system to 
report metrics
-   * @param taskFactory task factory to instantiate the Task
-   * @param processorListener listener to the StreamProcessor life cycle
+   * @param processorId a unique logical identifier assigned to the stream 
processor.
+   * @param config configuration required to launch {@link JobCoordinator} and 
{@link SamzaContainer}.
+   * @param customMetricsReporters registered with the metrics system to 
report metrics.
+   * @param taskFactory the task factory to instantiate the Task.
+   * @param processorListener listener to the StreamProcessor life cycle.
    *
-   * Deprecated: Use {@link #StreamProcessor(Config, Map, TaskFactory, 
Optional, Optional, Optional,
+   * Deprecated: Use {@link #StreamProcessor(String, Config, Map, TaskFactory, 
Optional, Optional, Optional,
    * StreamProcessorLifecycleListenerFactory, JobCoordinator)} instead.
    */
   @Deprecated
-  public StreamProcessor(Config config, Map<String, MetricsReporter> 
customMetricsReporters, TaskFactory taskFactory,
+  public StreamProcessor(String processorId, Config config, Map<String, 
MetricsReporter> customMetricsReporters, TaskFactory taskFactory,
       ProcessorLifecycleListener processorListener) {
-    this(config, customMetricsReporters, taskFactory, processorListener, null);
+    this(processorId, config, customMetricsReporters, taskFactory, 
processorListener, null);
   }
 
   /**
-   * Same as {@link #StreamProcessor(Config, Map, TaskFactory, Optional, 
Optional, Optional,
+   * Same as {@link #StreamProcessor(String, Config, Map, TaskFactory, 
Optional, Optional, Optional,
    * StreamProcessorLifecycleListenerFactory, JobCoordinator)}, with the 
following differences:
    * <ol>
    *   <li>Passes null for application-defined context factories</li>
@@ -189,25 +193,27 @@ public class StreamProcessor {
    *   {@link StreamProcessorLifecycleListenerFactory}</li>
    * </ol>
    *
+   * @param processorId a unique logical identifier assigned to the stream 
processor.
    * @param config configuration required to launch {@link JobCoordinator} and 
{@link SamzaContainer}
    * @param customMetricsReporters registered with the metrics system to 
report metrics
    * @param taskFactory task factory to instantiate the Task
    * @param processorListener listener to the StreamProcessor life cycle
    * @param jobCoordinator the instance of {@link JobCoordinator}
    *
-   * Deprecated: Use {@link #StreamProcessor(Config, Map, TaskFactory, 
Optional, Optional, Optional,
+   * Deprecated: Use {@link #StreamProcessor(String, Config, Map, TaskFactory, 
Optional, Optional, Optional,
    * StreamProcessorLifecycleListenerFactory, JobCoordinator)} instead.
    */
   @Deprecated
-  public StreamProcessor(Config config, Map<String, MetricsReporter> 
customMetricsReporters, TaskFactory taskFactory,
+  public StreamProcessor(String processorId, Config config, Map<String, 
MetricsReporter> customMetricsReporters, TaskFactory taskFactory,
       ProcessorLifecycleListener processorListener, JobCoordinator 
jobCoordinator) {
-    this(config, customMetricsReporters, taskFactory, Optional.empty(), 
Optional.empty(), Optional.empty(),
-        sp -> processorListener, jobCoordinator);
+    this(processorId, config, customMetricsReporters, taskFactory, 
Optional.empty(), Optional.empty(), Optional.empty(), sp -> processorListener,
+        jobCoordinator);
   }
 
   /**
    * Builds a {@link StreamProcessor} with full specification of processing 
components.
    *
+   * @param processorId a unique logical identifier assigned to the stream 
processor.
    * @param config configuration required to launch {@link JobCoordinator} and 
{@link SamzaContainer}
    * @param customMetricsReporters registered with the metrics system to 
report metrics
    * @param taskFactory task factory to instantiate the Task
@@ -217,14 +223,20 @@ public class StreamProcessor {
    * @param listenerFactory factory for creating a listener to the 
StreamProcessor life cycle
    * @param jobCoordinator the instance of {@link JobCoordinator}
    */
-  public StreamProcessor(Config config, Map<String, MetricsReporter> 
customMetricsReporters, TaskFactory taskFactory,
+  public StreamProcessor(String processorId, Config config, Map<String, 
MetricsReporter> customMetricsReporters, TaskFactory taskFactory,
       
Optional<ApplicationContainerContextFactory<ApplicationContainerContext>> 
applicationDefinedContainerContextFactoryOptional,
       Optional<ApplicationTaskContextFactory<ApplicationTaskContext>> 
applicationDefinedTaskContextFactoryOptional,
       Optional<ExternalContext> externalContextOptional, 
StreamProcessorLifecycleListenerFactory listenerFactory,
       JobCoordinator jobCoordinator) {
     Preconditions.checkNotNull(listenerFactory, 
"StreamProcessorListenerFactory cannot be null.");
+    Preconditions.checkArgument(StringUtils.isNotBlank(processorId), 
"ProcessorId cannot be null.");
     this.config = config;
+    this.processorId = processorId;
+    this.metricsRegistry = new MetricsRegistryMap();
     this.customMetricsReporter = customMetricsReporters;
+    for (MetricsReporter metricsReporter : customMetricsReporter.values()) {
+      metricsReporter.register("StreamProcessor", metricsRegistry);
+    }
     this.taskFactory = taskFactory;
     this.applicationDefinedContainerContextFactoryOptional = 
applicationDefinedContainerContextFactoryOptional;
     this.applicationDefinedTaskContextFactoryOptional = 
applicationDefinedTaskContextFactoryOptional;
@@ -235,8 +247,6 @@ public class StreamProcessor {
     this.jobCoordinator.setListener(jobCoordinatorListener);
     ThreadFactory threadFactory = new 
ThreadFactoryBuilder().setNameFormat(CONTAINER_THREAD_NAME_FORMAT).setDaemon(true).build();
     this.containerExcecutorService = 
Executors.newSingleThreadExecutor(threadFactory);
-    // TODO: remove the dependency on jobCoordinator for processorId after 
fixing SAMZA-1835
-    this.processorId = this.jobCoordinator.getProcessorId();
     this.processorListener = listenerFactory.createInstance(this);
   }
 
@@ -287,6 +297,7 @@ public class StreamProcessor {
    */
   public void stop() {
     synchronized (lock) {
+      LOGGER.info("Stopping the stream processor: {}.", processorId);
       if (state != State.STOPPING && state != State.STOPPED) {
         state = State.STOPPING;
         try {
@@ -328,7 +339,7 @@ public class StreamProcessor {
 
   private JobCoordinator createJobCoordinator() {
     String jobCoordinatorFactoryClassName = new 
JobCoordinatorConfig(config).getJobCoordinatorFactoryClassName();
-    return Util.getObj(jobCoordinatorFactoryClassName, 
JobCoordinatorFactory.class).getJobCoordinator(config);
+    return Util.getObj(jobCoordinatorFactoryClassName, 
JobCoordinatorFactory.class).getJobCoordinator(processorId, config, 
metricsRegistry);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/df56f2dc/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 189fc1f..7da3369 100644
--- 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -32,6 +32,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.descriptors.ApplicationDescriptor;
 import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
@@ -39,6 +40,7 @@ import 
org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
 import org.apache.samza.application.SamzaApplication;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.context.ExternalContext;
 import org.apache.samza.execution.LocalJobPlanner;
@@ -47,6 +49,7 @@ import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.processor.StreamProcessor;
 import org.apache.samza.task.TaskFactory;
 import org.apache.samza.task.TaskFactoryUtil;
+import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -167,14 +170,37 @@ public class LocalApplicationRunner implements 
ApplicationRunner {
       Optional<ExternalContext> externalContextOptional) {
     TaskFactory taskFactory = TaskFactoryUtil.getTaskFactory(appDesc);
     Map<String, MetricsReporter> reporters = new HashMap<>();
-    // TODO: the null processorId has to be fixed after SAMZA-1835
+    String processorId = createProcessorId(new ApplicationConfig(config));
     appDesc.getMetricsReporterFactories().forEach((name, factory) ->
-        reporters.put(name, factory.getMetricsReporter(name, null, config)));
-    return new StreamProcessor(config, reporters, taskFactory, 
appDesc.getApplicationContainerContextFactory(),
+        reporters.put(name, factory.getMetricsReporter(name, processorId, 
config)));
+    return new StreamProcessor(processorId, config, reporters, taskFactory, 
appDesc.getApplicationContainerContextFactory(),
         appDesc.getApplicationTaskContextFactory(), externalContextOptional, 
listenerFactory, null);
   }
 
   /**
+   * Generates a unique logical identifier for the stream processor using the 
provided {@param appConfig}.
+   * 1. If the processorId is defined in the configuration, then returns the 
value defined in the configuration.
+   * 2. Else if the {@linkplain ProcessorIdGenerator} class is defined the 
configuration, then uses the {@linkplain ProcessorIdGenerator}
+   * to generate the unique processorId.
+   * 3. Else throws the {@see ConfigException} back to the caller.
+   * @param appConfig the configuration of the samza application.
+   * @throws ConfigException if neither processor.id nor 
app.processor-id-generator.class is defined in the configuration.
+   * @return the generated processor identifier.
+   */
+  @VisibleForTesting
+  static String createProcessorId(ApplicationConfig appConfig) {
+    if (StringUtils.isNotBlank(appConfig.getProcessorId())) {
+      return appConfig.getProcessorId();
+    } else if 
(StringUtils.isNotBlank(appConfig.getAppProcessorIdGeneratorClass())) {
+      ProcessorIdGenerator idGenerator = 
Util.getObj(appConfig.getAppProcessorIdGeneratorClass(), 
ProcessorIdGenerator.class);
+      return idGenerator.generateProcessorId(appConfig);
+    } else {
+      throw new ConfigException(String.format("Expected either %s or %s to be 
configured", ApplicationConfig.PROCESSOR_ID,
+              ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS));
+    }
+  }
+
+  /**
    * Defines a specific implementation of {@link ProcessorLifecycleListener} 
for local {@link StreamProcessor}s.
    */
   private final class LocalStreamProcessorLifecycleListener implements 
ProcessorLifecycleListener {

http://git-wip-us.apache.org/repos/asf/samza/blob/df56f2dc/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
 
b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
index 44fd811..15a9205 100644
--- 
a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
@@ -20,9 +20,7 @@ package org.apache.samza.standalone;
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.samza.checkpoint.CheckpointManager;
-import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
-import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.TaskConfigJava;
 import org.apache.samza.container.grouper.task.GrouperMetadata;
@@ -34,7 +32,7 @@ import org.apache.samza.coordinator.JobCoordinatorListener;
 import org.apache.samza.runtime.LocationId;
 import org.apache.samza.runtime.LocationIdProvider;
 import org.apache.samza.runtime.LocationIdProviderFactory;
-import org.apache.samza.runtime.ProcessorIdGenerator;
+import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.storage.ChangelogStreamManager;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemAdmins;
@@ -73,8 +71,8 @@ public class PassthroughJobCoordinator implements 
JobCoordinator {
   private final LocationId locationId;
   private JobCoordinatorListener coordinatorListener = null;
 
-  public PassthroughJobCoordinator(Config config) {
-    this.processorId = createProcessorId(config);
+  public PassthroughJobCoordinator(String processorId, Config config, 
MetricsRegistry metricsRegistry) {
+    this.processorId = processorId;
     this.config = config;
     LocationIdProviderFactory locationIdProviderFactory = Util.getObj(new 
JobConfig(config).getLocationIdProviderFactory(), 
LocationIdProviderFactory.class);
     LocationIdProvider locationIdProvider = 
locationIdProviderFactory.getLocationIdProvider(config);
@@ -105,6 +103,7 @@ public class PassthroughJobCoordinator implements 
JobCoordinator {
         coordinatorListener.onNewJobModel(processorId, jobModel);
       }
     } else {
+      LOGGER.info("JobModel: {} does not contain processorId: {}. Stopping the 
JobCoordinator", jobModel, processorId);
       stop();
     }
   }
@@ -141,20 +140,4 @@ public class PassthroughJobCoordinator implements 
JobCoordinator {
   public String getProcessorId() {
     return this.processorId;
   }
-
-  private String createProcessorId(Config config) {
-    // TODO: This check to be removed after 0.13+
-    ApplicationConfig appConfig = new ApplicationConfig(config);
-    if (appConfig.getProcessorId() != null) {
-      return appConfig.getProcessorId();
-    } else if (appConfig.getAppProcessorIdGeneratorClass() != null) {
-      ProcessorIdGenerator idGenerator =
-          Util.getObj(appConfig.getAppProcessorIdGeneratorClass(), 
ProcessorIdGenerator.class);
-      return idGenerator.generateProcessorId(config);
-    } else {
-      throw new ConfigException(String
-          .format("Expected either %s or %s to be configured", 
ApplicationConfig.PROCESSOR_ID,
-              ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS));
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/df56f2dc/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinatorFactory.java
 
b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinatorFactory.java
index 2ba56a6..5d5fecf 100644
--- 
a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinatorFactory.java
+++ 
b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinatorFactory.java
@@ -21,10 +21,11 @@ package org.apache.samza.standalone;
 import org.apache.samza.config.Config;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorFactory;
+import org.apache.samza.metrics.MetricsRegistry;
 
 public class PassthroughJobCoordinatorFactory implements JobCoordinatorFactory 
{
   @Override
-  public JobCoordinator getJobCoordinator(Config config) {
-    return new PassthroughJobCoordinator(config);
+  public JobCoordinator getJobCoordinator(String processorId, Config config, 
MetricsRegistry metricsRegistry) {
+    return new PassthroughJobCoordinator(processorId, config, metricsRegistry);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/df56f2dc/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 8c5a3ba..8371070 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -29,14 +29,10 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.I0Itec.zkclient.IZkStateListener;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.checkpoint.CheckpointManager;
-import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
-import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.config.TaskConfigJava;
 import org.apache.samza.config.StorageConfig;
 import org.apache.samza.config.ZkConfig;
@@ -52,18 +48,14 @@ import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.metrics.MetricsReporter;
-import org.apache.samza.metrics.ReadableMetricsRegistry;
 import org.apache.samza.runtime.LocationId;
 import org.apache.samza.runtime.LocationIdProvider;
 import org.apache.samza.runtime.LocationIdProviderFactory;
-import org.apache.samza.runtime.ProcessorIdGenerator;
 import org.apache.samza.storage.ChangelogStreamManager;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.util.MetricsReporterLoader;
 import org.apache.samza.util.SystemClock;
 import org.apache.samza.util.Util;
 import org.apache.samza.zk.ZkUtils.ProcessorNode;
@@ -100,7 +92,6 @@ public class ZkJobCoordinator implements JobCoordinator {
   private final Config config;
   private final ZkBarrierForVersionUpgrade barrier;
   private final ZkJobCoordinatorMetrics metrics;
-  private final Map<String, MetricsReporter> reporters;
   private final ZkLeaderElector leaderElector;
   private final AtomicBoolean initiatedShutdown = new AtomicBoolean(false);
   private final StreamMetadataCache streamMetadataCache;
@@ -120,11 +111,11 @@ public class ZkJobCoordinator implements JobCoordinator {
   @VisibleForTesting
   StreamPartitionCountMonitor streamPartitionCountMonitor = null;
 
-  ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry, ZkUtils 
zkUtils) {
+  ZkJobCoordinator(String processorId, Config config, MetricsRegistry 
metricsRegistry, ZkUtils zkUtils) {
     this.config = config;
     this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry);
 
-    this.processorId = createProcessorId(config);
+    this.processorId = processorId;
     this.zkUtils = zkUtils;
     // setup a listener for a session state change
     // we are mostly interested in "session closed" and "new session created" 
events
@@ -132,7 +123,6 @@ public class ZkJobCoordinator implements JobCoordinator {
     leaderElector = new ZkLeaderElector(processorId, zkUtils);
     leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl());
     this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
-    this.reporters = MetricsReporterLoader.getMetricsReporters(new 
MetricsConfig(config), processorId);
     debounceTimer = new ScheduleAfterDebounceTime(processorId);
     debounceTimer.setScheduledTaskCallback(throwable -> {
         LOG.error("Received exception in debounce timer! Stopping the job 
coordinator", throwable);
@@ -152,7 +142,6 @@ public class ZkJobCoordinator implements JobCoordinator {
     zkUtils.validateZkVersion();
     zkUtils.validatePaths(new String[]{keyBuilder.getProcessorsPath(), 
keyBuilder.getJobModelVersionPath(), keyBuilder.getJobModelPathPrefix(), 
keyBuilder.getTaskLocalityPath()});
 
-    startMetrics();
     systemAdmins.start();
     leaderElector.tryBecomeLeader();
     zkUtils.subscribeToJobModelVersionChange(new 
ZkJobModelVersionChangeHandler(zkUtils));
@@ -192,9 +181,6 @@ public class ZkJobCoordinator implements JobCoordinator {
         LOG.debug("Shutting down system admins.");
         systemAdmins.stop();
 
-        LOG.debug("Shutting down metrics.");
-        shutdownMetrics();
-
         if (streamPartitionCountMonitor != null) {
           streamPartitionCountMonitor.stop();
         }
@@ -217,19 +203,6 @@ public class ZkJobCoordinator implements JobCoordinator {
     }
   }
 
-  private void startMetrics() {
-    for (MetricsReporter reporter: reporters.values()) {
-      reporter.register("job-coordinator-" + processorId, 
(ReadableMetricsRegistry) metrics.getMetricsRegistry());
-      reporter.start();
-    }
-  }
-
-  private void shutdownMetrics() {
-    for (MetricsReporter reporter: reporters.values()) {
-      reporter.stop();
-    }
-  }
-
   @Override
   public void setListener(JobCoordinatorListener listener) {
     this.coordinatorListener = listener;
@@ -307,22 +280,6 @@ public class ZkJobCoordinator implements JobCoordinator {
     debounceTimer.scheduleAfterDebounceTime(ON_ZK_CLEANUP, 0, () -> 
zkUtils.cleanupZK(NUM_VERSIONS_TO_LEAVE));
   }
 
-  private String createProcessorId(Config config) {
-    // TODO: This check to be removed after 0.13+
-    ApplicationConfig appConfig = new ApplicationConfig(config);
-    if (appConfig.getProcessorId() != null) {
-      return appConfig.getProcessorId();
-    } else if 
(StringUtils.isNotBlank(appConfig.getAppProcessorIdGeneratorClass())) {
-      ProcessorIdGenerator idGenerator =
-          Util.getObj(appConfig.getAppProcessorIdGeneratorClass(), 
ProcessorIdGenerator.class);
-      return idGenerator.generateProcessorId(config);
-    } else {
-      throw new ConfigException(String
-          .format("Expected either %s or %s to be configured", 
ApplicationConfig.PROCESSOR_ID,
-              ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS));
-    }
-  }
-
   /**
    * Generate new JobModel when becoming a leader or the list of processor 
changed.
    */

http://git-wip-us.apache.org/repos/asf/samza/blob/df56f2dc/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
index 41294a3..fdc2b0d 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
@@ -27,7 +27,6 @@ import org.apache.samza.config.ZkConfig;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorFactory;
 import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.metrics.MetricsRegistryMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,13 +43,12 @@ public class ZkJobCoordinatorFactory implements 
JobCoordinatorFactory {
    * @return An instance of {@link ZkJobCoordinator}
    */
   @Override
-  public JobCoordinator getJobCoordinator(Config config) {
+  public JobCoordinator getJobCoordinator(String processorId, Config config, 
MetricsRegistry metricsRegistry) {
     // TODO: Separate JC related configs into a "ZkJobCoordinatorConfig"
-    MetricsRegistry metricsRegistry = new MetricsRegistryMap();
     String jobCoordinatorZkBasePath = getJobCoordinationZkPath(config);
     ZkUtils zkUtils = getZkUtils(config, metricsRegistry, 
jobCoordinatorZkBasePath);
     LOG.debug("Creating ZkJobCoordinator with config: {}.", config);
-    return new ZkJobCoordinator(config, metricsRegistry, zkUtils);
+    return new ZkJobCoordinator(processorId, config, metricsRegistry, zkUtils);
   }
 
   private ZkUtils getZkUtils(Config config, MetricsRegistry metricsRegistry, 
String coordinatorZkBasePath) {

http://git-wip-us.apache.org/repos/asf/samza/blob/df56f2dc/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java 
b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
index 1c33b96..d977504 100644
--- 
a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
+++ 
b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
@@ -93,7 +93,7 @@ public class TestStreamProcessor {
         ProcessorLifecycleListener processorListener,
         JobCoordinator jobCoordinator,
         SamzaContainer container) {
-      super(config, customMetricsReporters, streamTaskFactory, 
processorListener, jobCoordinator);
+      super("TEST_PROCESSOR_ID", config,  customMetricsReporters, 
streamTaskFactory, processorListener, jobCoordinator);
       this.container = container;
     }
 
@@ -325,7 +325,7 @@ public class TestStreamProcessor {
     JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class);
     Mockito.doNothing().when(mockJobCoordinator).start();
     ProcessorLifecycleListener lifecycleListener = 
Mockito.mock(ProcessorLifecycleListener.class);
-    StreamProcessor streamProcessor = new StreamProcessor(new MapConfig(), new 
HashMap<>(), null, lifecycleListener, mockJobCoordinator);
+    StreamProcessor streamProcessor = new StreamProcessor("TestProcessorId", 
new MapConfig(), new HashMap<>(), null, lifecycleListener, mockJobCoordinator);
     assertEquals(State.NEW, streamProcessor.getState());
     streamProcessor.start();
 
@@ -344,7 +344,7 @@ public class TestStreamProcessor {
     ProcessorLifecycleListener lifecycleListener = 
Mockito.mock(ProcessorLifecycleListener.class);
     SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class);
     MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0"));
-    StreamProcessor streamProcessor = new StreamProcessor(config, new 
HashMap<>(), null, lifecycleListener, mockJobCoordinator);
+    StreamProcessor streamProcessor = new StreamProcessor("TestProcessorId", 
config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator);
 
     /**
      * Without a SamzaContainer running in StreamProcessor and current 
StreamProcessor state is STARTED,
@@ -412,7 +412,7 @@ public class TestStreamProcessor {
     ProcessorLifecycleListener lifecycleListener = 
Mockito.mock(ProcessorLifecycleListener.class);
     SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class);
     MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0"));
-    StreamProcessor streamProcessor = PowerMockito.spy(new 
StreamProcessor(config, new HashMap<>(), null, lifecycleListener, 
mockJobCoordinator));
+    StreamProcessor streamProcessor = PowerMockito.spy(new 
StreamProcessor("TestProcessorId", config, new HashMap<>(), null, 
lifecycleListener, mockJobCoordinator));
 
     Mockito.doNothing().when(mockJobCoordinator).stop();
     Mockito.doNothing().when(mockSamzaContainer).shutdown();
@@ -434,7 +434,7 @@ public class TestStreamProcessor {
     ProcessorLifecycleListener lifecycleListener = 
Mockito.mock(ProcessorLifecycleListener.class);
     SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class);
     MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0"));
-    StreamProcessor streamProcessor = new StreamProcessor(config, new 
HashMap<>(), null, lifecycleListener, mockJobCoordinator);
+    StreamProcessor streamProcessor = new StreamProcessor("TestProcessorId", 
config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator);
 
     Exception failureException = new Exception("dummy exception");
 
@@ -455,7 +455,7 @@ public class TestStreamProcessor {
     JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class);
     ProcessorLifecycleListener lifecycleListener = 
Mockito.mock(ProcessorLifecycleListener.class);
     MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0"));
-    StreamProcessor streamProcessor = new StreamProcessor(config, new 
HashMap<>(), null, lifecycleListener, mockJobCoordinator);
+    StreamProcessor streamProcessor = new StreamProcessor("TestProcessorId", 
config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator);
 
     streamProcessor.state = State.RUNNING;
     streamProcessor.jobCoordinatorListener.onCoordinatorStop();
@@ -468,7 +468,7 @@ public class TestStreamProcessor {
   public void testStreamProcessorWithStreamProcessorListenerFactory() {
     AtomicReference<MockStreamProcessorLifecycleListener> mockListener = new 
AtomicReference<>();
     StreamProcessor streamProcessor =
-        new StreamProcessor(mock(Config.class), new HashMap<>(), 
mock(TaskFactory.class), Optional.empty(),
+        new StreamProcessor("TestProcessorId", mock(Config.class), new 
HashMap<>(), mock(TaskFactory.class), Optional.empty(),
             Optional.empty(), Optional.empty(),
             sp -> mockListener.updateAndGet(old -> new 
MockStreamProcessorLifecycleListener(sp)),
             mock(JobCoordinator.class));

http://git-wip-us.apache.org/repos/asf/samza/blob/df56f2dc/samza-core/src/test/java/org/apache/samza/runtime/MockProcessorIdGenerator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/runtime/MockProcessorIdGenerator.java
 
b/samza-core/src/test/java/org/apache/samza/runtime/MockProcessorIdGenerator.java
new file mode 100644
index 0000000..a55a2ab
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/runtime/MockProcessorIdGenerator.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.runtime;
+
+import org.apache.samza.config.Config;
+
+public class MockProcessorIdGenerator implements ProcessorIdGenerator {
+  @Override
+  public String generateProcessorId(Config config) {
+    return "testProcessorId";
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/df56f2dc/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
 
b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index 5bd7893..c691500 100644
--- 
a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ 
b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.runtime;
 
+import com.google.common.collect.ImmutableMap;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.HashMap;
@@ -32,6 +33,7 @@ import 
org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.context.ExternalContext;
@@ -42,6 +44,7 @@ import org.apache.samza.task.IdentityStreamTask;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -221,6 +224,29 @@ public class TestLocalApplicationRunner {
     assertFalse("Application finished before the timeout.", finished);
   }
 
+  @Test
+  public void 
testCreateProcessorIdShouldReturnProcessorIdDefinedInConfiguration() {
+    String processorId = "testProcessorId";
+    MapConfig configMap = new 
MapConfig(ImmutableMap.of(ApplicationConfig.PROCESSOR_ID, processorId));
+    String actualProcessorId = LocalApplicationRunner.createProcessorId(new 
ApplicationConfig(configMap));
+    assertEquals(processorId, actualProcessorId);
+  }
+
+  @Test
+  public void 
testCreateProcessorIdShouldInvokeProcessorIdGeneratorDefinedInConfiguration() {
+    String processorId = "testProcessorId";
+    MapConfig configMap = new 
MapConfig(ImmutableMap.of(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, 
MockProcessorIdGenerator.class.getCanonicalName()));
+    String actualProcessorId = LocalApplicationRunner.createProcessorId(new 
ApplicationConfig(configMap));
+    assertEquals(processorId, actualProcessorId);
+  }
+
+  @Test(expected = ConfigException.class)
+  public void 
testCreateProcessorIdShouldThrowExceptionWhenProcessorIdAndGeneratorAreNotDefined()
 {
+    ApplicationConfig mockConfig = Mockito.mock(ApplicationConfig.class);
+    Mockito.when(mockConfig.getProcessorId()).thenReturn(null);
+    LocalApplicationRunner.createProcessorId(mockConfig);
+  }
+
   private void prepareTest() {
     ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc =
         ApplicationDescriptorUtil.getAppDescriptor(mockApp, config);

http://git-wip-us.apache.org/repos/asf/samza/blob/df56f2dc/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java 
b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
index 083caad..e0a0941 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
@@ -53,7 +53,7 @@ public class TestZkJobCoordinator {
     when(zkUtils.getZkClient()).thenReturn(mockZkClient);
     when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new 
JobModel(new MapConfig(), new HashMap<>()));
 
-    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(new 
MapConfig(), new NoOpMetricsRegistry(), zkUtils));
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new 
ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new 
NoOpMetricsRegistry(), zkUtils));
     doAnswer(new Answer<Void>() {
       public Void answer(InvocationOnMock invocation) {
         jcShutdownLatch.countDown();
@@ -80,7 +80,7 @@ public class TestZkJobCoordinator {
 
     ScheduleAfterDebounceTime mockDebounceTimer = 
Mockito.mock(ScheduleAfterDebounceTime.class);
 
-    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(new 
MapConfig(), new NoOpMetricsRegistry(), zkUtils));
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new 
ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new 
NoOpMetricsRegistry(), zkUtils));
     zkJobCoordinator.debounceTimer = mockDebounceTimer;
     final ZkSessionStateChangedListener zkSessionStateChangedListener = 
zkJobCoordinator.new ZkSessionStateChangedListener();
 
@@ -104,7 +104,7 @@ public class TestZkJobCoordinator {
 
     ScheduleAfterDebounceTime mockDebounceTimer = 
Mockito.mock(ScheduleAfterDebounceTime.class);
 
-    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(new 
MapConfig(), new NoOpMetricsRegistry(), zkUtils));
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new 
ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new 
NoOpMetricsRegistry(), zkUtils));
     StreamPartitionCountMonitor monitor = 
Mockito.mock(StreamPartitionCountMonitor.class);
     zkJobCoordinator.debounceTimer = mockDebounceTimer;
     zkJobCoordinator.streamPartitionCountMonitor = monitor;
@@ -127,7 +127,7 @@ public class TestZkJobCoordinator {
 
     ScheduleAfterDebounceTime mockDebounceTimer = 
Mockito.mock(ScheduleAfterDebounceTime.class);
 
-    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(new 
MapConfig(), new NoOpMetricsRegistry(), zkUtils));
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new 
ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new 
NoOpMetricsRegistry(), zkUtils));
 
     StreamPartitionCountMonitor monitor = 
Mockito.mock(StreamPartitionCountMonitor.class);
     zkJobCoordinator.debounceTimer = mockDebounceTimer;
@@ -154,7 +154,7 @@ public class TestZkJobCoordinator {
 
     ScheduleAfterDebounceTime mockDebounceTimer = 
Mockito.mock(ScheduleAfterDebounceTime.class);
 
-    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(new 
MapConfig(), new NoOpMetricsRegistry(), zkUtils));
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new 
ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new 
NoOpMetricsRegistry(), zkUtils));
 
     StreamPartitionCountMonitor monitor = 
Mockito.mock(StreamPartitionCountMonitor.class);
     zkJobCoordinator.debounceTimer = mockDebounceTimer;

http://git-wip-us.apache.org/repos/asf/samza/blob/df56f2dc/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
 
b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
index 7bd99bb..3760edb 100644
--- 
a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
+++ 
b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
@@ -47,6 +47,7 @@ import org.apache.samza.config.ZkConfig;
 import org.apache.samza.context.Context;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorFactory;
+import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.runtime.ProcessorLifecycleListener;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.OutgoingMessageEnvelope;
@@ -132,7 +133,7 @@ public class TestZkStreamProcessorBase extends 
StandaloneIntegrationTestHarness
     map.put(ApplicationConfig.PROCESSOR_ID, pId);
     Config config = new MapConfig(map);
     String jobCoordinatorFactoryClassName = new 
JobCoordinatorConfig(config).getJobCoordinatorFactoryClassName();
-    JobCoordinator jobCoordinator = 
Util.getObj(jobCoordinatorFactoryClassName, 
JobCoordinatorFactory.class).getJobCoordinator(config);
+    JobCoordinator jobCoordinator = 
Util.getObj(jobCoordinatorFactoryClassName, 
JobCoordinatorFactory.class).getJobCoordinator(pId, config, new 
MetricsRegistryMap());
 
     ProcessorLifecycleListener listener = new ProcessorLifecycleListener() {
       @Override
@@ -165,7 +166,7 @@ public class TestZkStreamProcessorBase extends 
StandaloneIntegrationTestHarness
     };
 
     StreamProcessor processor =
-        new StreamProcessor(config, new HashMap<>(), (StreamTaskFactory) 
TestStreamTask::new, listener, jobCoordinator);
+        new StreamProcessor(pId, config, new HashMap<>(), (StreamTaskFactory) 
TestStreamTask::new, listener, jobCoordinator);
 
     return processor;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/df56f2dc/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
index e7040ca..cf41148 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
@@ -59,6 +59,9 @@ import static org.mockito.Mockito.mock;
 
 
 public class TestStreamProcessor extends StandaloneIntegrationTestHarness {
+
+  public static final String PROCESSOR_ID = "1";
+
   /**
    * Testing a basic identity stream task - reads data from a topic and writes 
it to another topic
    * (without any modifications)
@@ -136,7 +139,7 @@ public class TestStreamProcessor extends 
StandaloneIntegrationTestHarness {
     final String outputTopic = "output4";
     final int messageCount = 20;
 
-    final Map<String, String> configMap = createConfigs("1", testSystem, 
inputTopic, outputTopic, messageCount);
+    final Map<String, String> configMap = createConfigs(PROCESSOR_ID, 
testSystem, inputTopic, outputTopic, messageCount);
     configMap.remove("task.class");
     final Config configs = new MapConfig(configMap);
     final TestStubs stubs = new TestStubs(configs, (StreamTaskFactory) null, 
bootstrapServers());
@@ -243,12 +246,12 @@ public class TestStreamProcessor extends 
StandaloneIntegrationTestHarness {
 
     TestStubs(Config config, StreamTaskFactory taskFactory, String 
bootstrapServer) {
       this(bootstrapServer);
-      processor = new StreamProcessor(config, new HashMap<>(), taskFactory, 
listener);
+      processor = new StreamProcessor("1", config, new HashMap<>(), 
taskFactory, listener);
     }
 
     TestStubs(Config config, AsyncStreamTaskFactory taskFactory, String 
bootstrapServer) {
       this(bootstrapServer);
-      processor = new StreamProcessor(config, new HashMap<>(), taskFactory, 
listener);
+      processor = new StreamProcessor("1", config, new HashMap<>(), 
taskFactory, listener);
     }
 
     private void initConsumer(String bootstrapServer) {

Reply via email to