AMBARI-22607 : Design and implement an AD job scheduler. (avijayan)

Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/2129b396
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/2129b396
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/2129b396

Branch: refs/heads/branch-feature-AMBARI-21105
Commit: 2129b3963b4e0690d9231f3c59ca034d6de9a9f3
Parents: b1f4e87
Author: Aravindan Vijayan <avija...@hortonworks.com>
Authored: Fri Dec 8 13:29:03 2017 -0800
Committer: Aravindan Vijayan <avija...@hortonworks.com>
Committed: Fri Dec 8 13:29:03 2017 -0800

----------------------------------------------------------------------
 .../conf/unix/config.yaml                       |  14 +-
 .../pom.xml                                     |  18 +-
 .../prototype/core/MetricSparkConsumer.java     | 169 +++++++------
 .../adservice/app/AnomalyDetectionApp.scala     |   6 +-
 .../app/AnomalyDetectionAppConfig.scala         |  11 +-
 .../app/AnomalyDetectionAppModule.scala         |   7 +-
 .../DetectionServiceConfiguration.scala         |  59 +++++
 .../configuration/SparkConfiguration.scala      |  25 +-
 .../db/PhoenixAnomalyStoreAccessor.scala        |   3 +-
 .../adservice/detection/AdJobManager.scala      |  88 +++++++
 .../detection/SparkApplicationRunner.scala      |  77 ++++++
 .../metrics/adservice/detection/Subsystem.scala |  27 ++
 .../detection/pointintime/EmaSparkDriver.scala  | 121 +++++++++
 .../pointintime/PointInTimeSubsystem.scala      | 116 +++++++++
 .../metadata/MetricDefinitionService.scala      |  78 ------
 .../metadata/MetricDefinitionServiceImpl.scala  | 242 ------------------
 .../metadata/MetricSourceDefinition.scala       |  20 --
 .../adservice/resource/DetectionResource.scala  |  40 +++
 .../resource/MetricDefinitionResource.scala     |   3 +-
 .../adservice/service/DetectionService.scala    |  26 ++
 .../service/DetectionServiceImpl.scala          |  56 +++++
 .../service/MetricDefinitionService.scala       |  84 +++++++
 .../service/MetricDefinitionServiceImpl.scala   | 252 +++++++++++++++++++
 .../src/test/resources/config.yaml              |  12 +-
 .../app/AnomalyDetectionAppConfigTest.scala     |   5 +-
 .../metadata/MetricDefinitionServiceTest.scala  |   1 +
 .../configuration/ams-admanager-config.xml      |  12 +-
 .../0.1.0/package/scripts/params.py             |   4 +-
 28 files changed, 1124 insertions(+), 452 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/conf/unix/config.yaml
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/conf/unix/config.yaml 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/conf/unix/config.yaml
index 85e4004..c9cbc04 100644
--- 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/conf/unix/config.yaml
+++ 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/conf/unix/config.yaml
@@ -38,8 +38,18 @@ metricDefinitionDB:
   # raise an error as soon as it detects an internal corruption
   performParanoidChecks: false
   # Path to Level DB directory
-  dbDirPath: /tmp/ambari-metrics-anomaly-detection/db
+  dbDirPath: /var/lib/ambari-metrics-anomaly-detection/
 
 spark:
   mode: standalone
-  masterHostPort: localhost:7077
\ No newline at end of file
+  master: spark://localhost:7077
+  sparkHome: /usr/lib/ambari-metrics-anomaly-detection/spark
+  jarfile: 
/usr/lib/ambari-metrics-anomaly-detection/ambari-metrics-anomaly-detection-service.jar
+
+detection:
+  trend: false
+  kafkaServers: localhost:6667
+  kafkaTopic: ambari-metrics-ad
+  kafkaConsumerGroup: ambari-metrics-ad-group
+  ema-w: 0.9
+  ema-n: 3

http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
index 50d7ef6..148921e 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
@@ -240,6 +240,11 @@
       <version>${spark.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-launcher_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.10</artifactId>
       <version>0.10.1.0</version>
@@ -289,8 +294,8 @@
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming-kafka_2.10</artifactId>
-      <version>1.6.3</version>
+      
<artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.phoenix</groupId>
@@ -338,7 +343,7 @@
       <exclusions>
         <exclusion>
           <groupId>com.fasterxml.jackson.module</groupId>
-          <artifactId>jackson-module-scala_2.11</artifactId>
+          <artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
         </exclusion>
       </exclusions>
     </dependency>
@@ -524,5 +529,12 @@
       <version>1.8.4</version>
       <scope>test</scope>
     </dependency>
+    <!-- https://mvnrepository.com/artifact/org.mockito/mockito-all -->
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>1.9.5</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricSparkConsumer.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricSparkConsumer.java
 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricSparkConsumer.java
index addeda7..3644379 100644
--- 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricSparkConsumer.java
+++ 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricSparkConsumer.java
@@ -32,7 +32,6 @@ import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaPairDStream;
 import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.kafka.KafkaUtils;
 import scala.Tuple2;
 
 import java.io.FileInputStream;
@@ -152,90 +151,90 @@ public class MetricSparkConsumer {
     Broadcast<Set<Pattern>> includePatternBroadcast = 
jssc.sparkContext().broadcast(includeMetricPatterns);
     Broadcast<Set<String>> includedHostBroadcast = 
jssc.sparkContext().broadcast(includedHosts);
 
-    JavaPairReceiverInputDStream<String, String> messages =
-      KafkaUtils.createStream(jssc, zkQuorum, groupId, 
Collections.singletonMap(topicName, numThreads));
-
-    //Convert JSON string to TimelineMetrics.
-    JavaDStream<TimelineMetrics> timelineMetricsStream = messages.map(new 
Function<Tuple2<String, String>, TimelineMetrics>() {
-      @Override
-      public TimelineMetrics call(Tuple2<String, String> message) throws 
Exception {
-        ObjectMapper mapper = new ObjectMapper();
-        TimelineMetrics metrics = mapper.readValue(message._2, 
TimelineMetrics.class);
-        return metrics;
-      }
-    });
-
-    timelineMetricsStream.print();
-
-    //Group TimelineMetric by AppId.
-    JavaPairDStream<String, TimelineMetrics> appMetricStream = 
timelineMetricsStream.mapToPair(
-      timelineMetrics -> timelineMetrics.getMetrics().isEmpty()  ?  new 
Tuple2<>("TEST", new TimelineMetrics()) : new Tuple2<String, 
TimelineMetrics>(timelineMetrics.getMetrics().get(0).getAppId(), 
timelineMetrics)
-    );
-
-    appMetricStream.print();
-
-    //Filter AppIds that are not needed.
-    JavaPairDStream<String, TimelineMetrics> filteredAppMetricStream = 
appMetricStream.filter(new Function<Tuple2<String, TimelineMetrics>, Boolean>() 
{
-      @Override
-      public Boolean call(Tuple2<String, TimelineMetrics> appMetricTuple) 
throws Exception {
-        return appIds.contains(appMetricTuple._1);
-      }
-    });
-
-    filteredAppMetricStream.print();
-
-    filteredAppMetricStream.foreachRDD(rdd -> {
-      rdd.foreach(
-        tuple2 -> {
-          long currentTime = System.currentTimeMillis();
-          EmaTechnique ema = emaTechniqueBroadcast.getValue();
-          if (currentTime > pitStartTime + pitTestInterval) {
-            LOG.info("Running Tukeys....");
-            pointInTimeADSystemBroadcast.getValue().runTukeysAndRefineEma(ema, 
currentTime);
-            pitStartTime = pitStartTime + pitTestInterval;
-          }
-
-          if (currentTime > ksStartTime + ksTestInterval) {
-            LOG.info("Running KS Test....");
-            trendADSystemBroadcast.getValue().runKSTest(currentTime, 
trendMetrics);
-            ksStartTime = ksStartTime + ksTestInterval;
-          }
-
-          if (currentTime > hdevStartTime + hsdevInterval) {
-            LOG.info("Running HSdev Test....");
-            trendADSystemBroadcast.getValue().runHsdevMethod();
-            hdevStartTime = hdevStartTime + hsdevInterval;
-          }
-
-          TimelineMetrics metrics = tuple2._2();
-          for (TimelineMetric timelineMetric : metrics.getMetrics()) {
-
-            boolean includeHost = 
includedHostBroadcast.getValue().contains(timelineMetric.getHostName());
-            boolean includeMetric = false;
-            if (includeHost) {
-              if (includePatternBroadcast.getValue().isEmpty()) {
-                includeMetric = true;
-              }
-              for (Pattern p : includePatternBroadcast.getValue()) {
-                Matcher m = p.matcher(timelineMetric.getMetricName());
-                if (m.find()) {
-                  includeMetric = true;
-                }
-              }
-            }
-
-            if (includeMetric) {
-              trendMetrics.add(new TrendMetric(timelineMetric.getMetricName(), 
timelineMetric.getAppId(),
-                timelineMetric.getHostName()));
-              List<MetricAnomaly> anomalies = ema.test(timelineMetric);
-              metricsCollectorInterfaceBroadcast.getValue().publish(anomalies);
-            }
-          }
-        });
-    });
-
-    jssc.start();
-    jssc.awaitTermination();
+//    JavaPairReceiverInputDStream<String, String> messages =
+//      KafkaUtils.createStream(jssc, zkQuorum, groupId, 
Collections.singletonMap(topicName, numThreads));
+//
+//    //Convert JSON string to TimelineMetrics.
+//    JavaDStream<TimelineMetrics> timelineMetricsStream = messages.map(new 
Function<Tuple2<String, String>, TimelineMetrics>() {
+//      @Override
+//      public TimelineMetrics call(Tuple2<String, String> message) throws 
Exception {
+//        ObjectMapper mapper = new ObjectMapper();
+//        TimelineMetrics metrics = mapper.readValue(message._2, 
TimelineMetrics.class);
+//        return metrics;
+//      }
+//    });
+//
+//    timelineMetricsStream.print();
+//
+//    //Group TimelineMetric by AppId.
+//    JavaPairDStream<String, TimelineMetrics> appMetricStream = 
timelineMetricsStream.mapToPair(
+//      timelineMetrics -> timelineMetrics.getMetrics().isEmpty()  ?  new 
Tuple2<>("TEST", new TimelineMetrics()) : new Tuple2<String, 
TimelineMetrics>(timelineMetrics.getMetrics().get(0).getAppId(), 
timelineMetrics)
+//    );
+//
+//    appMetricStream.print();
+//
+//    //Filter AppIds that are not needed.
+//    JavaPairDStream<String, TimelineMetrics> filteredAppMetricStream = 
appMetricStream.filter(new Function<Tuple2<String, TimelineMetrics>, Boolean>() 
{
+//      @Override
+//      public Boolean call(Tuple2<String, TimelineMetrics> appMetricTuple) 
throws Exception {
+//        return appIds.contains(appMetricTuple._1);
+//      }
+//    });
+//
+//    filteredAppMetricStream.print();
+//
+//    filteredAppMetricStream.foreachRDD(rdd -> {
+//      rdd.foreach(
+//        tuple2 -> {
+//          long currentTime = System.currentTimeMillis();
+//          EmaTechnique ema = emaTechniqueBroadcast.getValue();
+//          if (currentTime > pitStartTime + pitTestInterval) {
+//            LOG.info("Running Tukeys....");
+//            
pointInTimeADSystemBroadcast.getValue().runTukeysAndRefineEma(ema, currentTime);
+//            pitStartTime = pitStartTime + pitTestInterval;
+//          }
+//
+//          if (currentTime > ksStartTime + ksTestInterval) {
+//            LOG.info("Running KS Test....");
+//            trendADSystemBroadcast.getValue().runKSTest(currentTime, 
trendMetrics);
+//            ksStartTime = ksStartTime + ksTestInterval;
+//          }
+//
+//          if (currentTime > hdevStartTime + hsdevInterval) {
+//            LOG.info("Running HSdev Test....");
+//            trendADSystemBroadcast.getValue().runHsdevMethod();
+//            hdevStartTime = hdevStartTime + hsdevInterval;
+//          }
+//
+//          TimelineMetrics metrics = tuple2._2();
+//          for (TimelineMetric timelineMetric : metrics.getMetrics()) {
+//
+//            boolean includeHost = 
includedHostBroadcast.getValue().contains(timelineMetric.getHostName());
+//            boolean includeMetric = false;
+//            if (includeHost) {
+//              if (includePatternBroadcast.getValue().isEmpty()) {
+//                includeMetric = true;
+//              }
+//              for (Pattern p : includePatternBroadcast.getValue()) {
+//                Matcher m = p.matcher(timelineMetric.getMetricName());
+//                if (m.find()) {
+//                  includeMetric = true;
+//                }
+//              }
+//            }
+//
+//            if (includeMetric) {
+//              trendMetrics.add(new 
TrendMetric(timelineMetric.getMetricName(), timelineMetric.getAppId(),
+//                timelineMetric.getHostName()));
+//              List<MetricAnomaly> anomalies = ema.test(timelineMetric);
+//              
metricsCollectorInterfaceBroadcast.getValue().publish(anomalies);
+//            }
+//          }
+//        });
+//    });
+//
+//    jssc.start();
+//    jssc.awaitTermination();
   }
 }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala
 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala
index 2d0dbdf..aae216a 100644
--- 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala
+++ 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala
@@ -22,8 +22,7 @@ import javax.ws.rs.container.{ContainerRequestFilter, 
ContainerResponseFilter}
 
 import org.apache.ambari.metrics.adservice.app.GuiceInjector.{withInjector, 
wrap}
 import org.apache.ambari.metrics.adservice.db.{AdAnomalyStoreAccessor, 
MetadataDatasource}
-import org.apache.ambari.metrics.adservice.metadata.MetricDefinitionService
-import org.apache.ambari.metrics.adservice.service.ADQueryService
+import org.apache.ambari.metrics.adservice.service.{ADQueryService, 
MetricDefinitionService, DetectionService}
 import org.glassfish.jersey.filter.LoggingFilter
 
 import com.codahale.metrics.health.HealthCheck
@@ -53,6 +52,9 @@ class AnomalyDetectionApp extends 
Application[AnomalyDetectionAppConfig] {
       injector.getInstance(classOf[MetadataDatasource]).initialize
       injector.getInstance(classOf[MetricDefinitionService]).initialize
       injector.getInstance(classOf[ADQueryService]).initialize
+      injector.getInstance(classOf[DetectionService]).initialize
+
+      env.lifecycle().manage(injector.getInstance(classOf[DetectionService]))
     }
     env.jersey.register(jacksonJaxbJsonProvider)
     env.jersey.register(new LoggingFilter)

http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala
 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala
index 58efa97..fc6293d 100644
--- 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala
+++ 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala
@@ -20,7 +20,7 @@ package org.apache.ambari.metrics.adservice.app
 
 import javax.validation.Valid
 
-import org.apache.ambari.metrics.adservice.configuration.{HBaseConfiguration, 
_}
+import 
org.apache.ambari.metrics.adservice.configuration.{DetectionServiceConfiguration,
 HBaseConfiguration, _}
 
 import com.fasterxml.jackson.annotation.{JsonIgnore, JsonIgnoreProperties, 
JsonProperty}
 
@@ -59,6 +59,12 @@ class AnomalyDetectionAppConfig extends Configuration {
   @Valid
   private val sparkConfiguration = new SparkConfiguration
 
+  /**
+    * Detection Service configurations
+    */
+  @Valid
+  private val detectionServiceConfiguration = new DetectionServiceConfiguration
+
   /*
    AMS HBase Conf
     */
@@ -86,4 +92,7 @@ class AnomalyDetectionAppConfig extends Configuration {
   @JsonProperty("spark")
   def getSparkConfiguration: SparkConfiguration = sparkConfiguration
 
+  @JsonProperty("detection")
+  def getDetectionServiceConfiguration: DetectionServiceConfiguration = 
detectionServiceConfiguration
+
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala
 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala
index 68e9df9..0dbc557 100644
--- 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala
+++ 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala
@@ -19,9 +19,8 @@ package org.apache.ambari.metrics.adservice.app
 
 import org.apache.ambari.metrics.adservice.db._
 import org.apache.ambari.metrics.adservice.leveldb.LevelDBDataSource
-import org.apache.ambari.metrics.adservice.metadata.{MetricDefinitionService, 
MetricDefinitionServiceImpl}
-import org.apache.ambari.metrics.adservice.resource.{AnomalyResource, 
MetricDefinitionResource, RootResource}
-import org.apache.ambari.metrics.adservice.service.{ADQueryService, 
ADQueryServiceImpl}
+import org.apache.ambari.metrics.adservice.resource.{AnomalyResource, 
DetectionResource, MetricDefinitionResource, RootResource}
+import org.apache.ambari.metrics.adservice.service._
 
 import com.codahale.metrics.health.HealthCheck
 import com.google.inject.AbstractModule
@@ -37,11 +36,13 @@ class AnomalyDetectionAppModule(config: 
AnomalyDetectionAppConfig, env: Environm
     healthCheckBinder.addBinding().to(classOf[DefaultHealthCheck])
     bind(classOf[AnomalyResource])
     bind(classOf[MetricDefinitionResource])
+    bind(classOf[DetectionResource])
     bind(classOf[RootResource])
     
bind(classOf[AdMetadataStoreAccessor]).to(classOf[AdMetadataStoreAccessorImpl])
     bind(classOf[ADQueryService]).to(classOf[ADQueryServiceImpl])
     
bind(classOf[MetricDefinitionService]).to(classOf[MetricDefinitionServiceImpl])
     bind(classOf[MetadataDatasource]).to(classOf[LevelDBDataSource])
     
bind(classOf[AdAnomalyStoreAccessor]).to(classOf[PhoenixAnomalyStoreAccessor])
+    bind(classOf[DetectionService]).to(classOf[DetectionServiceImpl])
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/DetectionServiceConfiguration.scala
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/DetectionServiceConfiguration.scala
 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/DetectionServiceConfiguration.scala
new file mode 100644
index 0000000..d63affd
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/DetectionServiceConfiguration.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.configuration
+
+import com.fasterxml.jackson.annotation.JsonProperty
+
+class DetectionServiceConfiguration {
+
+  @JsonProperty("pointInTime")
+  private val pointInTime: Boolean = true
+
+  @JsonProperty("trend")
+  private val trend: Boolean = true
+
+  @JsonProperty("kafkaServers")
+  private val kafkaServers: String = null
+
+  @JsonProperty("kafkaTopic")
+  private val kafkaTopic: String = "ambari-metrics-ad"
+
+  @JsonProperty("kafkaConsumerGroup")
+  private val kafkaConsumerGroup: String = "ambari-metrics-ad-group"
+
+  @JsonProperty("ema-w")
+  private val emaW: String = "0.9"
+
+  @JsonProperty("ema-n")
+  private val emaN: String = "3"
+
+  def isPointInTimeSubsystemEnabled: Boolean = pointInTime
+
+  def isTrendSubsystemEnabled: Boolean = trend
+
+  def getKafkaServers : String = kafkaServers
+
+  def getKafkaTopic : String = kafkaTopic
+
+  def getKafkaConsumerGroup : String = kafkaConsumerGroup
+
+  def getEmaW : String = emaW
+
+  def getEmaN : String = emaN
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/SparkConfiguration.scala
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/SparkConfiguration.scala
 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/SparkConfiguration.scala
index 30efdc7..7dbf128 100644
--- 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/SparkConfiguration.scala
+++ 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/SparkConfiguration.scala
@@ -18,22 +18,29 @@
 
 package org.apache.ambari.metrics.adservice.configuration
 
-import javax.validation.constraints.NotNull
-
 import com.fasterxml.jackson.annotation.JsonProperty
 
 class SparkConfiguration {
 
-  @NotNull
-  private var mode: String = _
+  @JsonProperty("mode")
+  private val mode: String = "standalone"
+
+  @JsonProperty("master")
+  private val master: String = "spark://localhost:7077"
+
+  @JsonProperty("sparkHome")
+  private val sparkHome: String = 
"/usr/lib/ambari-metrics-anomaly-detection/spark"
+
+  @JsonProperty("jarfile")
+  private val jarfile: String = 
"/usr/lib/ambari-metrics-anomaly-detection/ambari-metrics-anomaly-detection-service.jar"
 
-  @NotNull
-  private var masterHostPort: String = _
 
-  @JsonProperty
   def getMode: String = mode
 
-  @JsonProperty
-  def getMasterHostPort: String = masterHostPort
+  def getMaster: String = master
+
+  def getSparkHome: String = sparkHome
+
+  def getJarFile: String = jarfile
 
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala
 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala
index 53e6dee..ef456ae 100644
--- 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala
+++ 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala
@@ -22,10 +22,11 @@ import java.util.concurrent.TimeUnit.SECONDS
 
 import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig
 import org.apache.ambari.metrics.adservice.configuration.HBaseConfiguration
-import org.apache.ambari.metrics.adservice.metadata.{MetricDefinitionService, 
MetricKey}
+import org.apache.ambari.metrics.adservice.metadata.MetricKey
 import 
org.apache.ambari.metrics.adservice.model.AnomalyDetectionMethod.AnomalyDetectionMethod
 import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType
 import org.apache.ambari.metrics.adservice.model._
+import org.apache.ambari.metrics.adservice.service.MetricDefinitionService
 import org.apache.hadoop.hbase.util.RetryCounterFactory
 import org.slf4j.{Logger, LoggerFactory}
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/AdJobManager.scala
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/AdJobManager.scala
 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/AdJobManager.scala
new file mode 100644
index 0000000..cdf6b97
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/AdJobManager.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.detection
+
+import org.slf4j.{Logger, LoggerFactory}
+import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig
+import 
org.apache.ambari.metrics.adservice.detection.pointintime.PointInTimeSubsystem
+
+/**
+  * Class to start Anomaly detection jobs on spark.
+   */
+class AdJobManager{
+
+  val LOG : Logger = LoggerFactory.getLogger(classOf[AdJobManager])
+
+  var config: AnomalyDetectionAppConfig = _
+  var sparkApplicationRunner: SparkApplicationRunner = _
+
+  val configuredSubsystems: scala.collection.mutable.Map[String, Subsystem] = 
scala.collection.mutable.Map()
+  var isInitialized : Boolean = false
+
+  def this (config: AnomalyDetectionAppConfig) = {
+    this ()
+    this.config = config
+    this.sparkApplicationRunner = new 
SparkApplicationRunner(config.getSparkConfiguration)
+  }
+
+  /**
+    * Initialize subsystems
+    */
+  def initializeSubsystems() : Unit = {
+    if (config.getDetectionServiceConfiguration.isPointInTimeSubsystemEnabled) 
{
+      configuredSubsystems("pointintime") = new 
PointInTimeSubsystem(config.getDetectionServiceConfiguration, 
sparkApplicationRunner)
+    }
+  }
+
+
+  /**
+    * Start AD jobs.
+    */
+  def startAdJobs() : Unit = {
+    if (!isInitialized) {
+      initializeSubsystems()
+      isInitialized = true
+    }
+
+    for (subsystem <- configuredSubsystems.values) {
+      subsystem.start()
+    }
+  }
+
+  /**
+    * Stop AD jobs.
+    */
+  def stopAdJobs() : Unit = {
+    for (subsystem <- configuredSubsystems.values) {
+      subsystem.stop()
+    }
+  }
+
+  /**
+    * Get State of all the AD jobs.
+    * @return
+    */
+  def state() : Map[String, Map[String, String]] = {
+    val stateMap: scala.collection.mutable.Map[String, Map[String, String]] = 
scala.collection.mutable.Map()
+
+    for ((subsystemName, subsystem) <- configuredSubsystems) {
+      stateMap(subsystemName) = subsystem.state
+    }
+    stateMap.toMap
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/SparkApplicationRunner.scala
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/SparkApplicationRunner.scala
 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/SparkApplicationRunner.scala
new file mode 100644
index 0000000..f025409
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/SparkApplicationRunner.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.detection
+
+import org.apache.ambari.metrics.adservice.configuration.SparkConfiguration
+import org.apache.spark.launcher.{SparkAppHandle, SparkLauncher}
+import org.apache.spark.launcher.SparkAppHandle.State
+
+/**
+  * Class to run Spark jobs given a set of arguments
+  */
+class SparkApplicationRunner{
+
+  var config: SparkConfiguration = _
+  val env: java.util.HashMap[String, String] = new java.util.HashMap()
+  env.put("SPARK_PRINT_LAUNCH_COMMAND", "1")
+  val sparkArgs: Map[String, String] = Map.empty
+
+  def this(config: SparkConfiguration) = {
+    this()
+    this.config = config
+  }
+
+  /**
+    * Run Spark Job.
+    * @param appName Name of the application
+    * @param appClass Application Class
+    * @param appArgs Command Line args to be submitted to Spark.
+    * @return Handle to the Spark job.
+    */
+  def runSparkJob(appName: String,
+                  appClass: String,
+                  appArgs: Iterable[String]): SparkAppHandle = {
+
+    val launcher: SparkLauncher = new SparkLauncher(env)
+      .setAppName(appName)
+      .setSparkHome(config.getSparkHome)
+      .setAppResource(config.getJarFile)
+      .setMainClass(appClass)
+      .setMaster(config.getMaster)
+
+    for (arg <- appArgs) {
+      launcher.addAppArgs(arg)
+    }
+
+    for ((name,value) <- sparkArgs) {
+      launcher.addSparkArg(name, value)
+    }
+
+    val handle: SparkAppHandle = launcher.startApplication()
+    handle
+  }
+
+  /**
+    * Helper method to check if a Spark job is running.
+    * @param handle
+    * @return
+    */
+  def isRunning(handle: SparkAppHandle): Boolean = {
+    handle.getState.equals(State.CONNECTED) || 
handle.getState.equals(State.SUBMITTED) || handle.getState.equals(State.RUNNING)
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/Subsystem.scala
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/Subsystem.scala
 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/Subsystem.scala
new file mode 100644
index 0000000..056fc61
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/Subsystem.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.detection
+
+trait Subsystem {
+
+  def start() : Unit
+
+  def stop(): Unit
+
+  def state: Map[String, String]
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/pointintime/EmaSparkDriver.scala
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/pointintime/EmaSparkDriver.scala
 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/pointintime/EmaSparkDriver.scala
new file mode 100644
index 0000000..261f839
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/pointintime/EmaSparkDriver.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.detection.pointintime
+
+import java.util
+
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.apache.spark.SparkConf
+import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, 
LocationStrategies}
+import org.apache.spark.streaming.{Duration, StreamingContext}
+import org.slf4j.{Logger, LoggerFactory}
+
+import com.fasterxml.jackson.databind.ObjectMapper
+
+//TODO Work in Progress. Will be updated in the next patch.
+/**
+  * EMA Spark streaming driver application.
+  * Input :
+  *   EMA algorithm input - w & n.
+  *   Kafka brokers
+  *   Kafka Topic and group name
+  *
+  */
+object EmaSparkDriver {
+
+//  @Inject
+//  var metricDefinitionService : MetricDefinitionService = _
+
+  val LOG : Logger = LoggerFactory.getLogger("EmaSparkDriver")
+
+  def main(args: Array[String]): Unit = {
+
+    val emaW = args(0)
+    val emaN = args(1)
+    val kafkaServers = args(2)
+    val kafkaTopicName = args(3)
+    val kafkaGroupName = args(4)
+
+    val sparkConf = new SparkConf().setAppName("EmaSparkDriver")
+
+
+    //Instantiate Kafka stream reader
+    val streamingContext = new StreamingContext(sparkConf, Duration(10000))
+
+    val kafkaParams: java.util.HashMap[String, Object] = new 
java.util.HashMap[String, Object]
+    kafkaParams.put("bootstrap.servers", kafkaServers)
+    kafkaParams.put("key.deserializer", classOf[StringDeserializer])
+    kafkaParams.put("value.deserializer", classOf[StringDeserializer])
+    kafkaParams.put("group.id", kafkaGroupName)
+    kafkaParams.put("auto.offset.reset", "latest")
+    kafkaParams.put("enable.auto.commit", false: java.lang.Boolean)
+
+    val kafkaStream =
+      KafkaUtils.createDirectStream(
+        streamingContext,
+        LocationStrategies.PreferConsistent,
+        ConsumerStrategies.Subscribe[String, String](
+          util.Arrays.asList(kafkaTopicName),
+          kafkaParams.asInstanceOf[java.util.Map[String, Object]]
+        )
+      )
+
+    kafkaStream.print()
+
+    var timelineMetricsStream = kafkaStream.map(message => {
+      val mapper = new ObjectMapper
+      val metrics = mapper.readValue(message.value, classOf[TimelineMetrics])
+      metrics
+    })
+    timelineMetricsStream.print()
+
+    var filteredAppMetricStream = timelineMetricsStream.map(timelineMetrics => 
{
+      val filteredMetrics : TimelineMetrics = timelineMetrics
+//      for (metric : TimelineMetric <- timelineMetrics.getMetrics.asScala) {
+//        val metricKey : MetricKey = MetricKey(
+//          metric.getMetricName,
+//          metric.getAppId,
+//          metric.getInstanceId,
+//          metric.getHostName,
+//          null)
+//
+//        if (metricKeys.value.apply(metricKey)) {
+//          filteredMetrics.addOrMergeTimelineMetric(metric)
+//        }
+//      }
+      filteredMetrics
+    })
+    filteredAppMetricStream.print()
+
+    filteredAppMetricStream.foreachRDD(rdd => {
+      rdd.foreach(item => {
+        val timelineMetrics : TimelineMetrics = item
+        LOG.info("Received Metric : " + 
timelineMetrics.getMetrics.get(0).getMetricName)
+//        for (timelineMetric <- timelineMetrics.getMetrics) {
+//          var anomalies = emaModel.test(timelineMetric)
+//          anomalyMetricPublisher.publish(anomalies)
+//        }
+      })
+    })
+
+    streamingContext.start()
+    streamingContext.awaitTermination()
+
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/pointintime/PointInTimeSubsystem.scala
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/pointintime/PointInTimeSubsystem.scala
 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/pointintime/PointInTimeSubsystem.scala
new file mode 100644
index 0000000..fa06ab2
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/pointintime/PointInTimeSubsystem.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.detection.pointintime
+
+import 
org.apache.ambari.metrics.adservice.configuration.DetectionServiceConfiguration
+import org.apache.ambari.metrics.adservice.detection.{SparkApplicationRunner, 
Subsystem}
+import org.apache.spark.launcher.SparkAppHandle
+import org.slf4j.{Logger, LoggerFactory}
+
+import com.google.inject.Singleton
+
+@Singleton
+class PointInTimeSubsystem extends Subsystem{
+
+  val LOG : Logger = LoggerFactory.getLogger(classOf[PointInTimeSubsystem])
+
+  val appHandleMap: scala.collection.mutable.Map[String, SparkAppHandle] = 
scala.collection.mutable.Map()
+  var applicationRunner: SparkApplicationRunner = _
+  var config: DetectionServiceConfiguration = _
+
+  //EMA Stuff
+  val emaDriverClass = 
"org.apache.ambari.metrics.adservice.detection.pointintime.EmaSparkDriver"
+  val emaAppName = "Ema_Spark_Application"
+  val emaConfig: scala.collection.mutable.MutableList[String] = 
scala.collection.mutable.MutableList()
+
+  def this(config: DetectionServiceConfiguration, applicationRunner: 
SparkApplicationRunner) = {
+    this
+    this.applicationRunner = applicationRunner
+    this.config = config
+
+    //Initialize
+    initializeConfigs()
+  }
+
+  override def start(): Unit = {
+
+    LOG.info("Starting Point in Time AD jobs...")
+
+    if (!appHandleMap.contains(emaAppName) || 
!applicationRunner.isRunning(appHandleMap.apply(emaAppName))) {
+      LOG.info("Starting " + emaAppName)
+
+      if (config.getKafkaServers == null || config.getKafkaServers.isEmpty) {
+        LOG.error("Cannot run" + emaAppName + " without Kafka Servers config")
+      } else {
+        val args : scala.collection.mutable.MutableList[String] = 
scala.collection.mutable.MutableList()
+        val emaHandle: SparkAppHandle = 
applicationRunner.runSparkJob(emaAppName, emaDriverClass, emaConfig)
+        appHandleMap(emaAppName) = emaHandle
+
+        Thread.sleep(3000)
+
+        if (applicationRunner.isRunning(emaHandle)) {
+          LOG.info(emaAppName + " successfully started.")
+        } else {
+          LOG.error(emaAppName + " start failed.")
+        }
+      }
+    } else {
+      LOG.info(emaAppName + " already running. Moving ahead.")
+    }
+
+  }
+
+  override def stop(): Unit = {
+
+    LOG.info("Stopping Point in Time AD jobs...")
+    for ((app, handle) <- appHandleMap) {
+      handle.stop()
+    }
+    //Sleep for 3 seconds
+    Thread.sleep(3000)
+
+    for ((app, handle) <- appHandleMap) {
+      if (!applicationRunner.isRunning(handle)) {
+        LOG.info(app + " successfully stopped.")
+      }
+    }
+  }
+
+  override def state: Map[String, String] = {
+    val stateMap: scala.collection.mutable.Map[String, String] = 
scala.collection.mutable.Map()
+    for ((app, handle) <- appHandleMap) {
+      if (handle == null) {
+        stateMap(app) = null
+      } else {
+        stateMap(app) = handle.getState.toString
+      }
+    }
+    stateMap.toMap
+  }
+
+
+  private def initializeConfigs(): Unit = {
+    //EMA Configs
+    emaConfig.+=(config.getEmaW)
+    emaConfig.+=(config.getEmaN)
+    emaConfig.+=(config.getKafkaServers)
+    emaConfig.+=(config.getKafkaTopic)
+    emaConfig.+=(config.getKafkaConsumerGroup)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionService.scala
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionService.scala
 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionService.scala
deleted file mode 100644
index 52ce39e..0000000
--- 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionService.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.metrics.adservice.metadata
-
-import org.apache.ambari.metrics.adservice.service.AbstractADService
-
-trait MetricDefinitionService extends AbstractADService{
-
-  /**
-    * Given a 'UUID', return the metric key associated with it.
-    * @param uuid UUID
-    * @return
-    */
-  def getMetricKeyFromUuid(uuid: Array[Byte]) : MetricKey
-
-  /**
-    * Return all the definitions being tracked.
-    * @return Map of Metric Source Definition name to Metric Source Definition.
-    */
-  def getDefinitions: List[MetricSourceDefinition]
-
-  /**
-    * Given a component definition name, return the definition associated with 
it.
-    * @param name component definition name
-    * @return
-    */
-  def getDefinitionByName(name: String) : MetricSourceDefinition
-
-  /**
-    * Add a new definition.
-    * @param definition component definition JSON
-    * @return
-    */
-  def addDefinition(definition: MetricSourceDefinition) : Boolean
-
-  /**
-    * Update a component definition by name. Only definitions which were added 
by API can be modified through API.
-    * @param definition component definition name
-    * @return
-    */
-  def updateDefinition(definition: MetricSourceDefinition) : Boolean
-
-  /**
-    * Delete a component definition by name. Only definitions which were added 
by API can be deleted through API.
-    * @param name component definition name
-    * @return
-    */
-  def deleteDefinitionByName(name: String) : Boolean
-
-  /**
-    * Given an appId, return set of definitions that are tracked for that 
appId.
-    * @param appId component definition appId
-    * @return
-    */
-  def getDefinitionByAppId(appId: String) : List[MetricSourceDefinition]
-
-  /**
-    * Return the mapping between definition name to set of metric keys.
-    * @return Map of Metric Source Definition to set of metric keys associated 
with it.
-    */
-  def getMetricKeys:  Map[String, Set[MetricKey]]
-
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceImpl.scala
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceImpl.scala
 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceImpl.scala
deleted file mode 100644
index b9b4a7c..0000000
--- 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceImpl.scala
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.metrics.adservice.metadata
-
-import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig
-import org.apache.ambari.metrics.adservice.db.AdMetadataStoreAccessor
-import org.slf4j.{Logger, LoggerFactory}
-
-import com.google.inject.{Inject, Singleton}
-
-@Singleton
-class MetricDefinitionServiceImpl extends MetricDefinitionService {
-
-  val LOG : Logger = 
LoggerFactory.getLogger(classOf[MetricDefinitionServiceImpl])
-
-  var adMetadataStoreAccessor: AdMetadataStoreAccessor = _
-  var configuration: AnomalyDetectionAppConfig = _
-  var metricMetadataProvider: MetricMetadataProvider = _
-
-  val metricSourceDefinitionMap: scala.collection.mutable.Map[String, 
MetricSourceDefinition] = scala.collection.mutable.Map()
-  val metricDefinitionMetricKeyMap: 
scala.collection.mutable.Map[MetricSourceDefinition, Set[MetricKey]] = 
scala.collection.mutable.Map()
-  val metricKeys: scala.collection.mutable.Set[MetricKey] = 
scala.collection.mutable.Set.empty[MetricKey]
-
-  @Inject
-  def this (anomalyDetectionAppConfig: AnomalyDetectionAppConfig, 
metadataStoreAccessor: AdMetadataStoreAccessor) = {
-    this ()
-    adMetadataStoreAccessor = metadataStoreAccessor
-    configuration = anomalyDetectionAppConfig
-  }
-
-  @Override
-  def initialize() : Unit = {
-    LOG.info("Initializing Metric Definition Service...")
-
-    //Initialize Metric Metadata Provider
-    metricMetadataProvider = new 
ADMetadataProvider(configuration.getMetricCollectorConfiguration)
-
-    //Load definitions from metadata store
-    val definitionsFromStore: List[MetricSourceDefinition] = 
adMetadataStoreAccessor.getSavedInputDefinitions
-    for (definition <- definitionsFromStore) {
-      sanitizeMetricSourceDefinition(definition)
-    }
-
-    //Load definitions from configs
-    val definitionsFromConfig: List[MetricSourceDefinition] = 
getInputDefinitionsFromConfig
-    for (definition <- definitionsFromConfig) {
-      sanitizeMetricSourceDefinition(definition)
-    }
-
-    //Union the 2 sources, with DB taking precedence.
-    //Save new definition list to DB.
-    
metricSourceDefinitionMap.++=(combineDefinitionSources(definitionsFromConfig, 
definitionsFromStore))
-
-    //Reach out to AMS Metadata and get Metric Keys. Pass in MSD and get back 
Set<MK>
-    for (definition <- metricSourceDefinitionMap.values) {
-      val keys: Set[MetricKey] = 
metricMetadataProvider.getMetricKeysForDefinitions(definition)
-      metricDefinitionMetricKeyMap(definition) = keys
-      metricKeys.++=(keys)
-    }
-
-    LOG.info("Successfully initialized Metric Definition Service.")
-  }
-
-  def getMetricKeyFromUuid(uuid: Array[Byte]): MetricKey = {
-    var key: MetricKey = null
-    for (metricKey <- metricKeys) {
-      if (metricKey.uuid.sameElements(uuid)) {
-        key = metricKey
-      }
-    }
-    key
-  }
-
-  @Override
-  def getDefinitions: List[MetricSourceDefinition] = {
-    metricSourceDefinitionMap.values.toList
-  }
-
-  @Override
-  def getDefinitionByName(name: String): MetricSourceDefinition = {
-    if (!metricSourceDefinitionMap.contains(name)) {
-      LOG.warn("Metric Source Definition with name " + name + " not found")
-      null
-    } else {
-      metricSourceDefinitionMap.apply(name)
-    }
-  }
-
-  @Override
-  def addDefinition(definition: MetricSourceDefinition): Boolean = {
-    if (metricSourceDefinitionMap.contains(definition.definitionName)) {
-      LOG.info("Definition with name " + definition.definitionName + " already 
present.")
-      return false
-    }
-    definition.definitionSource = MetricSourceDefinitionType.API
-
-    val success: Boolean = 
adMetadataStoreAccessor.saveInputDefinition(definition)
-    if (success) {
-      metricSourceDefinitionMap += definition.definitionName -> definition
-      val keys: Set[MetricKey] = 
metricMetadataProvider.getMetricKeysForDefinitions(definition)
-      metricDefinitionMetricKeyMap(definition) = keys
-      metricKeys.++=(keys)
-      LOG.info("Successfully created metric source definition : " + 
definition.definitionName)
-    }
-    success
-  }
-
-  @Override
-  def updateDefinition(definition: MetricSourceDefinition): Boolean = {
-    if (!metricSourceDefinitionMap.contains(definition.definitionName)) {
-      LOG.warn("Metric Source Definition with name " + 
definition.definitionName + " not found")
-      return false
-    }
-
-    if 
(metricSourceDefinitionMap.apply(definition.definitionName).definitionSource != 
MetricSourceDefinitionType.API) {
-      return false
-    }
-    definition.definitionSource = MetricSourceDefinitionType.API
-
-    val success: Boolean = 
adMetadataStoreAccessor.saveInputDefinition(definition)
-    if (success) {
-      metricSourceDefinitionMap += definition.definitionName -> definition
-      val keys: Set[MetricKey] = 
metricMetadataProvider.getMetricKeysForDefinitions(definition)
-      metricDefinitionMetricKeyMap(definition) = keys
-      metricKeys.++=(keys)
-      LOG.info("Successfully updated metric source definition : " + 
definition.definitionName)
-    }
-    success
-  }
-
-  @Override
-  def deleteDefinitionByName(name: String): Boolean = {
-    if (!metricSourceDefinitionMap.contains(name)) {
-      LOG.warn("Metric Source Definition with name " + name + " not found")
-      return false
-    }
-
-    val definition : MetricSourceDefinition = 
metricSourceDefinitionMap.apply(name)
-    if (definition.definitionSource != MetricSourceDefinitionType.API) {
-      LOG.warn("Cannot delete metric source definition which was not created 
through API.")
-      return false
-    }
-
-    val success: Boolean = adMetadataStoreAccessor.removeInputDefinition(name)
-    if (success) {
-      metricSourceDefinitionMap -= definition.definitionName
-      metricKeys.--=(metricDefinitionMetricKeyMap.apply(definition))
-      metricDefinitionMetricKeyMap -= definition
-      LOG.info("Successfully deleted metric source definition : " + name)
-    }
-    success
-  }
-
-  @Override
-  def getDefinitionByAppId(appId: String): List[MetricSourceDefinition] = {
-
-    val defList : List[MetricSourceDefinition] = 
metricSourceDefinitionMap.values.toList
-    defList.filter(_.appId == appId)
-  }
-
-  def combineDefinitionSources(configDefinitions: 
List[MetricSourceDefinition], dbDefinitions: List[MetricSourceDefinition])
-  : Map[String, MetricSourceDefinition] = {
-
-    var combinedDefinitionMap: scala.collection.mutable.Map[String, 
MetricSourceDefinition] =
-      scala.collection.mutable.Map.empty[String, MetricSourceDefinition]
-
-    for (definitionFromDb <- dbDefinitions) {
-      combinedDefinitionMap(definitionFromDb.definitionName) = definitionFromDb
-    }
-
-    for (definition <- configDefinitions) {
-      if (!dbDefinitions.contains(definition)) {
-        adMetadataStoreAccessor.saveInputDefinition(definition)
-        combinedDefinitionMap(definition.definitionName) = definition
-      }
-    }
-    combinedDefinitionMap.toMap
-  }
-
-  def getInputDefinitionsFromConfig: List[MetricSourceDefinition] = {
-    val configDirectory = 
configuration.getMetricDefinitionServiceConfiguration.getInputDefinitionDirectory
-    
InputMetricDefinitionParser.parseInputDefinitionsFromDirectory(configDirectory)
-  }
-
-  def setAdMetadataStoreAccessor (adMetadataStoreAccessor: 
AdMetadataStoreAccessor) : Unit = {
-    this.adMetadataStoreAccessor = adMetadataStoreAccessor
-  }
-
-  /**
-    * Look into the Metric Definitions inside a Metric Source definition, and 
push down source level appId &
-    * hosts to Metric definition if they do not have an override.
-    * @param metricSourceDefinition Input Metric Source Definition
-    */
-  def sanitizeMetricSourceDefinition(metricSourceDefinition: 
MetricSourceDefinition): Unit = {
-    val sourceLevelAppId: String = metricSourceDefinition.appId
-    val sourceLevelHostList: List[String] = metricSourceDefinition.hosts
-
-    for (metricDef <- metricSourceDefinition.metricDefinitions.toList) {
-      if (metricDef.appId == null) {
-        if (sourceLevelAppId == null || sourceLevelAppId.isEmpty) {
-          metricDef.makeInvalid()
-        } else {
-          metricDef.appId = sourceLevelAppId
-        }
-      }
-
-      if (metricDef.isValid && (metricDef.hosts == null || 
metricDef.hosts.isEmpty)) {
-        if (sourceLevelHostList != null && sourceLevelHostList.nonEmpty) {
-          metricDef.hosts = sourceLevelHostList
-        }
-      }
-    }
-  }
-
-  /**
-    * Return the mapping between definition name to set of metric keys.
-    *
-    * @return Map of Metric Source Definition to set of metric keys associated 
with it.
-    */
-  override def getMetricKeys: Map[String, Set[MetricKey]] = {
-    val metricKeyMap: scala.collection.mutable.Map[String, Set[MetricKey]] = 
scala.collection.mutable.Map()
-    for (definition <- metricSourceDefinitionMap.values) {
-      metricKeyMap(definition.definitionName) = 
metricDefinitionMetricKeyMap.apply(definition)
-    }
-    metricKeyMap.toMap
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinition.scala
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinition.scala
 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinition.scala
index 47b1499..d49ba34 100644
--- 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinition.scala
+++ 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinition.scala
@@ -22,26 +22,6 @@ import javax.xml.bind.annotation.XmlRootElement
 import 
org.apache.ambari.metrics.adservice.metadata.MetricSourceDefinitionType.MetricSourceDefinitionType
 import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType
 
-/*
-{
- "definition-name": "host-memory",
- "app-id" : "HOST",
- "hosts" : [“c6401.ambari.apache.org”],
- "metric-definitions" : [
-   {
-       "metric-name": "mem_free",
-       "metric-description" : "Free memory on a Host.",
-       "troubleshooting-info" : "Sudden drop / hike in free memory on a host.",
-       "static-threshold" : 10,
-       “app-id” : “HOST”
-}   ],
-
- "related-definition-names" : ["host-cpu", “host-network”],
- “anomaly-detection-subsystems” : [“point-in-time”, “trend”]
-}
-*/
-
-
 @SerialVersionUID(10001L)
 @XmlRootElement
 class MetricSourceDefinition extends Serializable{

http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/DetectionResource.scala
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/DetectionResource.scala
 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/DetectionResource.scala
new file mode 100644
index 0000000..6a04190
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/DetectionResource.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.resource
+
+import javax.ws.rs.core.MediaType.APPLICATION_JSON
+import javax.ws.rs.core.Response
+import javax.ws.rs.{GET, Path, Produces}
+
+import org.apache.ambari.metrics.adservice.service.DetectionService
+
+import com.google.inject.Inject
+
+@Path("/detection")
+class DetectionResource {
+
+  @Inject
+  var detectionService: DetectionService = _
+
+  @GET
+  @Produces(Array(APPLICATION_JSON))
+  @Path("/state")
+  def getState: Response = {
+    Response.ok.entity(detectionService.state()).build()
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala
 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala
index 442bf46..7c26014 100644
--- 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala
+++ 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala
@@ -21,7 +21,8 @@ import javax.ws.rs._
 import javax.ws.rs.core.MediaType.APPLICATION_JSON
 import javax.ws.rs.core.Response
 
-import org.apache.ambari.metrics.adservice.metadata.{MetricDefinitionService, 
MetricKey, MetricSourceDefinition}
+import org.apache.ambari.metrics.adservice.metadata.{MetricKey, 
MetricSourceDefinition}
+import org.apache.ambari.metrics.adservice.service.MetricDefinitionService
 import org.apache.commons.lang.StringUtils
 
 import com.google.inject.Inject

http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/DetectionService.scala
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/DetectionService.scala
 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/DetectionService.scala
new file mode 100644
index 0000000..2cdf20d
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/DetectionService.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.service
+
+import io.dropwizard.lifecycle.Managed
+
+trait DetectionService extends AbstractADService with Managed{
+
+  def state() : Map[String, Map[String, String]]
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/DetectionServiceImpl.scala
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/DetectionServiceImpl.scala
 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/DetectionServiceImpl.scala
new file mode 100644
index 0000000..ba091cc
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/DetectionServiceImpl.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.service
+
+import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig
+import org.apache.ambari.metrics.adservice.detection.AdJobManager
+import org.slf4j.{Logger, LoggerFactory}
+
+import com.google.inject.{Inject, Singleton}
+
+@Singleton
+class DetectionServiceImpl extends DetectionService{
+
+  val LOG : Logger = LoggerFactory.getLogger(classOf[DetectionServiceImpl])
+
+  var adJobManager: AdJobManager = _
+  var config : AnomalyDetectionAppConfig = _
+
+  @Inject
+  def this (anomalyDetectionAppConfig: AnomalyDetectionAppConfig) = {
+    this ()
+    this.config = anomalyDetectionAppConfig
+  }
+
+  override def initialize(): Unit = {
+    this.adJobManager = new AdJobManager(config)
+    adJobManager.startAdJobs()
+  }
+
+  override def state(): Map[String, Map[String, String]] = {
+    adJobManager.state()
+  }
+
+  override def start(): Unit = {
+  }
+
+  override def stop(): Unit = {
+    LOG.info("Stop Detection Service hook invoked. Stopping AD Jobs")
+    adJobManager.stopAdJobs()
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/MetricDefinitionService.scala
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/MetricDefinitionService.scala
 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/MetricDefinitionService.scala
new file mode 100644
index 0000000..11db8c7
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/MetricDefinitionService.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.service
+
+import org.apache.ambari.metrics.adservice.metadata.{MetricKey, 
MetricSourceDefinition}
+
+trait MetricDefinitionService extends AbstractADService{
+
+  /**
+    * Given a 'UUID', return the metric key associated with it.
+    * @param uuid UUID
+    * @return
+    */
+  def getMetricKeyFromUuid(uuid: Array[Byte]) : MetricKey
+
+  /**
+    * Return all the definitions being tracked.
+    * @return Map of Metric Source Definition name to Metric Source Definition.
+    */
+  def getDefinitions: List[MetricSourceDefinition]
+
+  /**
+    * Given a component definition name, return the definition associated with 
it.
+    * @param name component definition name
+    * @return
+    */
+  def getDefinitionByName(name: String) : MetricSourceDefinition
+
+  /**
+    * Add a new definition.
+    * @param definition component definition JSON
+    * @return
+    */
+  def addDefinition(definition: MetricSourceDefinition) : Boolean
+
+  /**
+    * Update a component definition by name. Only definitions which were added 
by API can be modified through API.
+    * @param definition component definition name
+    * @return
+    */
+  def updateDefinition(definition: MetricSourceDefinition) : Boolean
+
+  /**
+    * Delete a component definition by name. Only definitions which were added 
by API can be deleted through API.
+    * @param name component definition name
+    * @return
+    */
+  def deleteDefinitionByName(name: String) : Boolean
+
+  /**
+    * Given an appId, return set of definitions that are tracked for that 
appId.
+    * @param appId component definition appId
+    * @return
+    */
+  def getDefinitionByAppId(appId: String) : List[MetricSourceDefinition]
+
+  /**
+    * Return the mapping between definition name to set of metric keys.
+    * @return Map of Metric Source Definition to set of metric keys associated 
with it.
+    */
+  def getMetricKeys:  Map[String, Set[MetricKey]]
+
+  /**
+    * Return the set of metric keys.
+    * @return Set of metric keys.
+    */
+  def getMetricKeyList: Set[MetricKey]
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/MetricDefinitionServiceImpl.scala
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/MetricDefinitionServiceImpl.scala
 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/MetricDefinitionServiceImpl.scala
new file mode 100644
index 0000000..24bd652
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/MetricDefinitionServiceImpl.scala
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.service
+
+import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig
+import org.apache.ambari.metrics.adservice.db.AdMetadataStoreAccessor
+import org.apache.ambari.metrics.adservice.metadata._
+import org.slf4j.{Logger, LoggerFactory}
+
+import com.google.inject.{Inject, Singleton}
+
+@Singleton
+class MetricDefinitionServiceImpl extends MetricDefinitionService {
+
+  val LOG : Logger = 
LoggerFactory.getLogger(classOf[MetricDefinitionServiceImpl])
+
+  var adMetadataStoreAccessor: AdMetadataStoreAccessor = _
+  var configuration: AnomalyDetectionAppConfig = _
+  var metricMetadataProvider: MetricMetadataProvider = _
+
+  val metricSourceDefinitionMap: scala.collection.mutable.Map[String, 
MetricSourceDefinition] = scala.collection.mutable.Map()
+  val metricDefinitionMetricKeyMap: 
scala.collection.mutable.Map[MetricSourceDefinition, Set[MetricKey]] = 
scala.collection.mutable.Map()
+  val metricKeys: scala.collection.mutable.Set[MetricKey] = 
scala.collection.mutable.Set.empty[MetricKey]
+
+  @Inject
+  def this (anomalyDetectionAppConfig: AnomalyDetectionAppConfig, 
metadataStoreAccessor: AdMetadataStoreAccessor) = {
+    this ()
+    adMetadataStoreAccessor = metadataStoreAccessor
+    configuration = anomalyDetectionAppConfig
+  }
+
+  @Override
+  def initialize() : Unit = {
+    LOG.info("Initializing Metric Definition Service...")
+
+    //Initialize Metric Metadata Provider
+    metricMetadataProvider = new 
ADMetadataProvider(configuration.getMetricCollectorConfiguration)
+
+    //Load definitions from metadata store
+    val definitionsFromStore: List[MetricSourceDefinition] = 
adMetadataStoreAccessor.getSavedInputDefinitions
+    for (definition <- definitionsFromStore) {
+      sanitizeMetricSourceDefinition(definition)
+    }
+
+    //Load definitions from configs
+    val definitionsFromConfig: List[MetricSourceDefinition] = 
getInputDefinitionsFromConfig
+    for (definition <- definitionsFromConfig) {
+      sanitizeMetricSourceDefinition(definition)
+    }
+
+    //Union the 2 sources, with DB taking precedence.
+    //Save new definition list to DB.
+    
metricSourceDefinitionMap.++=(combineDefinitionSources(definitionsFromConfig, 
definitionsFromStore))
+
+    //Reach out to AMS Metadata and get Metric Keys. Pass in MSD and get back 
Set<MK>
+    for (definition <- metricSourceDefinitionMap.values) {
+      val keys: Set[MetricKey] = 
metricMetadataProvider.getMetricKeysForDefinitions(definition)
+      metricDefinitionMetricKeyMap(definition) = keys
+      metricKeys.++=(keys)
+    }
+
+    LOG.info("Successfully initialized Metric Definition Service.")
+  }
+
+  def getMetricKeyFromUuid(uuid: Array[Byte]): MetricKey = {
+    var key: MetricKey = null
+    for (metricKey <- metricKeys) {
+      if (metricKey.uuid.sameElements(uuid)) {
+        key = metricKey
+      }
+    }
+    key
+  }
+
+  @Override
+  def getDefinitions: List[MetricSourceDefinition] = {
+    metricSourceDefinitionMap.values.toList
+  }
+
+  @Override
+  def getDefinitionByName(name: String): MetricSourceDefinition = {
+    if (!metricSourceDefinitionMap.contains(name)) {
+      LOG.warn("Metric Source Definition with name " + name + " not found")
+      null
+    } else {
+      metricSourceDefinitionMap.apply(name)
+    }
+  }
+
+  @Override
+  def addDefinition(definition: MetricSourceDefinition): Boolean = {
+    if (metricSourceDefinitionMap.contains(definition.definitionName)) {
+      LOG.info("Definition with name " + definition.definitionName + " already 
present.")
+      return false
+    }
+    definition.definitionSource = MetricSourceDefinitionType.API
+
+    val success: Boolean = 
adMetadataStoreAccessor.saveInputDefinition(definition)
+    if (success) {
+      metricSourceDefinitionMap += definition.definitionName -> definition
+      val keys: Set[MetricKey] = 
metricMetadataProvider.getMetricKeysForDefinitions(definition)
+      metricDefinitionMetricKeyMap(definition) = keys
+      metricKeys.++=(keys)
+      LOG.info("Successfully created metric source definition : " + 
definition.definitionName)
+    }
+    success
+  }
+
+  @Override
+  def updateDefinition(definition: MetricSourceDefinition): Boolean = {
+    if (!metricSourceDefinitionMap.contains(definition.definitionName)) {
+      LOG.warn("Metric Source Definition with name " + 
definition.definitionName + " not found")
+      return false
+    }
+
+    if 
(metricSourceDefinitionMap.apply(definition.definitionName).definitionSource != 
MetricSourceDefinitionType.API) {
+      return false
+    }
+    definition.definitionSource = MetricSourceDefinitionType.API
+
+    val success: Boolean = 
adMetadataStoreAccessor.saveInputDefinition(definition)
+    if (success) {
+      metricSourceDefinitionMap += definition.definitionName -> definition
+      val keys: Set[MetricKey] = 
metricMetadataProvider.getMetricKeysForDefinitions(definition)
+      metricDefinitionMetricKeyMap(definition) = keys
+      metricKeys.++=(keys)
+      LOG.info("Successfully updated metric source definition : " + 
definition.definitionName)
+    }
+    success
+  }
+
+  @Override
+  def deleteDefinitionByName(name: String): Boolean = {
+    if (!metricSourceDefinitionMap.contains(name)) {
+      LOG.warn("Metric Source Definition with name " + name + " not found")
+      return false
+    }
+
+    val definition : MetricSourceDefinition = 
metricSourceDefinitionMap.apply(name)
+    if (definition.definitionSource != MetricSourceDefinitionType.API) {
+      LOG.warn("Cannot delete metric source definition which was not created 
through API.")
+      return false
+    }
+
+    val success: Boolean = adMetadataStoreAccessor.removeInputDefinition(name)
+    if (success) {
+      metricSourceDefinitionMap -= definition.definitionName
+      metricKeys.--=(metricDefinitionMetricKeyMap.apply(definition))
+      metricDefinitionMetricKeyMap -= definition
+      LOG.info("Successfully deleted metric source definition : " + name)
+    }
+    success
+  }
+
+  @Override
+  def getDefinitionByAppId(appId: String): List[MetricSourceDefinition] = {
+
+    val defList : List[MetricSourceDefinition] = 
metricSourceDefinitionMap.values.toList
+    defList.filter(_.appId == appId)
+  }
+
+  def combineDefinitionSources(configDefinitions: 
List[MetricSourceDefinition], dbDefinitions: List[MetricSourceDefinition])
+  : Map[String, MetricSourceDefinition] = {
+
+    var combinedDefinitionMap: scala.collection.mutable.Map[String, 
MetricSourceDefinition] =
+      scala.collection.mutable.Map.empty[String, MetricSourceDefinition]
+
+    for (definitionFromDb <- dbDefinitions) {
+      combinedDefinitionMap(definitionFromDb.definitionName) = definitionFromDb
+    }
+
+    for (definition <- configDefinitions) {
+      if (!dbDefinitions.contains(definition)) {
+        adMetadataStoreAccessor.saveInputDefinition(definition)
+        combinedDefinitionMap(definition.definitionName) = definition
+      }
+    }
+    combinedDefinitionMap.toMap
+  }
+
+  def getInputDefinitionsFromConfig: List[MetricSourceDefinition] = {
+    val configDirectory = 
configuration.getMetricDefinitionServiceConfiguration.getInputDefinitionDirectory
+    
InputMetricDefinitionParser.parseInputDefinitionsFromDirectory(configDirectory)
+  }
+
+  def setAdMetadataStoreAccessor (adMetadataStoreAccessor: 
AdMetadataStoreAccessor) : Unit = {
+    this.adMetadataStoreAccessor = adMetadataStoreAccessor
+  }
+
+  /**
+    * Look into the Metric Definitions inside a Metric Source definition, and 
push down source level appId &
+    * hosts to Metric definition if they do not have an override.
+    * @param metricSourceDefinition Input Metric Source Definition
+    */
+  def sanitizeMetricSourceDefinition(metricSourceDefinition: 
MetricSourceDefinition): Unit = {
+    val sourceLevelAppId: String = metricSourceDefinition.appId
+    val sourceLevelHostList: List[String] = metricSourceDefinition.hosts
+
+    for (metricDef <- metricSourceDefinition.metricDefinitions.toList) {
+      if (metricDef.appId == null) {
+        if (sourceLevelAppId == null || sourceLevelAppId.isEmpty) {
+          metricDef.makeInvalid()
+        } else {
+          metricDef.appId = sourceLevelAppId
+        }
+      }
+
+      if (metricDef.isValid && (metricDef.hosts == null || 
metricDef.hosts.isEmpty)) {
+        if (sourceLevelHostList != null && sourceLevelHostList.nonEmpty) {
+          metricDef.hosts = sourceLevelHostList
+        }
+      }
+    }
+  }
+
+  /**
+    * Return the mapping between definition name to set of metric keys.
+    *
+    * @return Map of Metric Source Definition to set of metric keys associated 
with it.
+    */
+  override def getMetricKeys: Map[String, Set[MetricKey]] = {
+    val metricKeyMap: scala.collection.mutable.Map[String, Set[MetricKey]] = 
scala.collection.mutable.Map()
+    for (definition <- metricSourceDefinitionMap.values) {
+      metricKeyMap(definition.definitionName) = 
metricDefinitionMetricKeyMap.apply(definition)
+    }
+    metricKeyMap.toMap
+  }
+
+  /**
+    * Return the set of metric keys.
+    *
+    * @return Set of metric keys.
+    */
+  override def getMetricKeyList: Set[MetricKey] = {
+    metricKeys.toSet
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/resources/config.yaml
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/resources/config.yaml
 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/resources/config.yaml
index 6b09499..b28078b 100644
--- 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/resources/config.yaml
+++ 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/resources/config.yaml
@@ -32,4 +32,14 @@ metricDefinitionDB:
 
 spark:
   mode: standalone
-  masterHostPort: localhost:7077
\ No newline at end of file
+  master: spark://localhost:7077
+  sparkHome: /usr/lib/ambari-metrics-anomaly-detection/spark
+  jarfile: 
/usr/lib/ambari-metrics-anomaly-detection/ambari-metrics-anomaly-detection-service.jar
+
+detection:
+  trend: false
+  kafkaServers: localhost:6667
+  kafkaTopic: ambari-metrics-ad
+  kafkaConsumerGroup: ambari-metrics-ad-group
+  ema-w: 0.9
+  ema-n: 3

http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfigTest.scala
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfigTest.scala
 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfigTest.scala
index 76391a0..fb6998f 100644
--- 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfigTest.scala
+++ 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfigTest.scala
@@ -60,7 +60,10 @@ class AnomalyDetectionAppConfigTest extends FunSuite {
     assert(!config.getMetricDefinitionDBConfiguration.getPerformParanoidChecks)
 
     assert(config.getSparkConfiguration.getMode.equals("standalone"))
-    
assert(config.getSparkConfiguration.getMasterHostPort.equals("localhost:7077"))
+    
assert(config.getSparkConfiguration.getMaster.equals("spark://localhost:7077"))
+
+    
assert(config.getDetectionServiceConfiguration.isPointInTimeSubsystemEnabled)
+    assert(!config.getDetectionServiceConfiguration.isTrendSubsystemEnabled)
 
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceTest.scala
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceTest.scala
 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceTest.scala
index d3454f2..f1e128b 100644
--- 
a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceTest.scala
+++ 
b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceTest.scala
@@ -20,6 +20,7 @@ package org.apache.ambari.metrics.adservice.metadata
 
 import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig
 import org.apache.ambari.metrics.adservice.db.AdMetadataStoreAccessor
+import org.apache.ambari.metrics.adservice.service.MetricDefinitionServiceImpl
 import org.easymock.EasyMock.{anyObject, expect, expectLastCall, replay}
 import org.scalatest.FunSuite
 import org.scalatest.easymock.EasyMockSugar

Reply via email to