AMBARI-22607 : Design and implement an AD job scheduler. (avijayan)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/2129b396 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/2129b396 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/2129b396 Branch: refs/heads/branch-feature-AMBARI-21105 Commit: 2129b3963b4e0690d9231f3c59ca034d6de9a9f3 Parents: b1f4e87 Author: Aravindan Vijayan <avija...@hortonworks.com> Authored: Fri Dec 8 13:29:03 2017 -0800 Committer: Aravindan Vijayan <avija...@hortonworks.com> Committed: Fri Dec 8 13:29:03 2017 -0800 ---------------------------------------------------------------------- .../conf/unix/config.yaml | 14 +- .../pom.xml | 18 +- .../prototype/core/MetricSparkConsumer.java | 169 +++++++------ .../adservice/app/AnomalyDetectionApp.scala | 6 +- .../app/AnomalyDetectionAppConfig.scala | 11 +- .../app/AnomalyDetectionAppModule.scala | 7 +- .../DetectionServiceConfiguration.scala | 59 +++++ .../configuration/SparkConfiguration.scala | 25 +- .../db/PhoenixAnomalyStoreAccessor.scala | 3 +- .../adservice/detection/AdJobManager.scala | 88 +++++++ .../detection/SparkApplicationRunner.scala | 77 ++++++ .../metrics/adservice/detection/Subsystem.scala | 27 ++ .../detection/pointintime/EmaSparkDriver.scala | 121 +++++++++ .../pointintime/PointInTimeSubsystem.scala | 116 +++++++++ .../metadata/MetricDefinitionService.scala | 78 ------ .../metadata/MetricDefinitionServiceImpl.scala | 242 ------------------ .../metadata/MetricSourceDefinition.scala | 20 -- .../adservice/resource/DetectionResource.scala | 40 +++ .../resource/MetricDefinitionResource.scala | 3 +- .../adservice/service/DetectionService.scala | 26 ++ .../service/DetectionServiceImpl.scala | 56 +++++ .../service/MetricDefinitionService.scala | 84 +++++++ .../service/MetricDefinitionServiceImpl.scala | 252 +++++++++++++++++++ .../src/test/resources/config.yaml | 12 +- .../app/AnomalyDetectionAppConfigTest.scala | 5 +- .../metadata/MetricDefinitionServiceTest.scala | 1 + .../configuration/ams-admanager-config.xml | 12 +- .../0.1.0/package/scripts/params.py | 4 +- 28 files changed, 1124 insertions(+), 452 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/conf/unix/config.yaml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/conf/unix/config.yaml b/ambari-metrics/ambari-metrics-anomaly-detection-service/conf/unix/config.yaml index 85e4004..c9cbc04 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/conf/unix/config.yaml +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/conf/unix/config.yaml @@ -38,8 +38,18 @@ metricDefinitionDB: # raise an error as soon as it detects an internal corruption performParanoidChecks: false # Path to Level DB directory - dbDirPath: /tmp/ambari-metrics-anomaly-detection/db + dbDirPath: /var/lib/ambari-metrics-anomaly-detection/ spark: mode: standalone - masterHostPort: localhost:7077 \ No newline at end of file + master: spark://localhost:7077 + sparkHome: /usr/lib/ambari-metrics-anomaly-detection/spark + jarfile: /usr/lib/ambari-metrics-anomaly-detection/ambari-metrics-anomaly-detection-service.jar + +detection: + trend: false + kafkaServers: localhost:6667 + kafkaTopic: ambari-metrics-ad + kafkaConsumerGroup: ambari-metrics-ad-group + ema-w: 0.9 + ema-n: 3 http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml b/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml index 50d7ef6..148921e 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml @@ -240,6 +240,11 @@ <version>${spark.version}</version> </dependency> <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-launcher_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + </dependency> + <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.10.1.0</version> @@ -289,8 +294,8 @@ </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming-kafka_2.10</artifactId> - <version>1.6.3</version> + <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId> + <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.phoenix</groupId> @@ -338,7 +343,7 @@ <exclusions> <exclusion> <groupId>com.fasterxml.jackson.module</groupId> - <artifactId>jackson-module-scala_2.11</artifactId> + <artifactId>jackson-module-scala_${scala.binary.version}</artifactId> </exclusion> </exclusions> </dependency> @@ -524,5 +529,12 @@ <version>1.8.4</version> <scope>test</scope> </dependency> + <!-- https://mvnrepository.com/artifact/org.mockito/mockito-all --> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>1.9.5</version> + <scope>test</scope> + </dependency> </dependencies> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricSparkConsumer.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricSparkConsumer.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricSparkConsumer.java index addeda7..3644379 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricSparkConsumer.java +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricSparkConsumer.java @@ -32,7 +32,6 @@ import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.apache.spark.streaming.kafka.KafkaUtils; import scala.Tuple2; import java.io.FileInputStream; @@ -152,90 +151,90 @@ public class MetricSparkConsumer { Broadcast<Set<Pattern>> includePatternBroadcast = jssc.sparkContext().broadcast(includeMetricPatterns); Broadcast<Set<String>> includedHostBroadcast = jssc.sparkContext().broadcast(includedHosts); - JavaPairReceiverInputDStream<String, String> messages = - KafkaUtils.createStream(jssc, zkQuorum, groupId, Collections.singletonMap(topicName, numThreads)); - - //Convert JSON string to TimelineMetrics. - JavaDStream<TimelineMetrics> timelineMetricsStream = messages.map(new Function<Tuple2<String, String>, TimelineMetrics>() { - @Override - public TimelineMetrics call(Tuple2<String, String> message) throws Exception { - ObjectMapper mapper = new ObjectMapper(); - TimelineMetrics metrics = mapper.readValue(message._2, TimelineMetrics.class); - return metrics; - } - }); - - timelineMetricsStream.print(); - - //Group TimelineMetric by AppId. - JavaPairDStream<String, TimelineMetrics> appMetricStream = timelineMetricsStream.mapToPair( - timelineMetrics -> timelineMetrics.getMetrics().isEmpty() ? new Tuple2<>("TEST", new TimelineMetrics()) : new Tuple2<String, TimelineMetrics>(timelineMetrics.getMetrics().get(0).getAppId(), timelineMetrics) - ); - - appMetricStream.print(); - - //Filter AppIds that are not needed. - JavaPairDStream<String, TimelineMetrics> filteredAppMetricStream = appMetricStream.filter(new Function<Tuple2<String, TimelineMetrics>, Boolean>() { - @Override - public Boolean call(Tuple2<String, TimelineMetrics> appMetricTuple) throws Exception { - return appIds.contains(appMetricTuple._1); - } - }); - - filteredAppMetricStream.print(); - - filteredAppMetricStream.foreachRDD(rdd -> { - rdd.foreach( - tuple2 -> { - long currentTime = System.currentTimeMillis(); - EmaTechnique ema = emaTechniqueBroadcast.getValue(); - if (currentTime > pitStartTime + pitTestInterval) { - LOG.info("Running Tukeys...."); - pointInTimeADSystemBroadcast.getValue().runTukeysAndRefineEma(ema, currentTime); - pitStartTime = pitStartTime + pitTestInterval; - } - - if (currentTime > ksStartTime + ksTestInterval) { - LOG.info("Running KS Test...."); - trendADSystemBroadcast.getValue().runKSTest(currentTime, trendMetrics); - ksStartTime = ksStartTime + ksTestInterval; - } - - if (currentTime > hdevStartTime + hsdevInterval) { - LOG.info("Running HSdev Test...."); - trendADSystemBroadcast.getValue().runHsdevMethod(); - hdevStartTime = hdevStartTime + hsdevInterval; - } - - TimelineMetrics metrics = tuple2._2(); - for (TimelineMetric timelineMetric : metrics.getMetrics()) { - - boolean includeHost = includedHostBroadcast.getValue().contains(timelineMetric.getHostName()); - boolean includeMetric = false; - if (includeHost) { - if (includePatternBroadcast.getValue().isEmpty()) { - includeMetric = true; - } - for (Pattern p : includePatternBroadcast.getValue()) { - Matcher m = p.matcher(timelineMetric.getMetricName()); - if (m.find()) { - includeMetric = true; - } - } - } - - if (includeMetric) { - trendMetrics.add(new TrendMetric(timelineMetric.getMetricName(), timelineMetric.getAppId(), - timelineMetric.getHostName())); - List<MetricAnomaly> anomalies = ema.test(timelineMetric); - metricsCollectorInterfaceBroadcast.getValue().publish(anomalies); - } - } - }); - }); - - jssc.start(); - jssc.awaitTermination(); +// JavaPairReceiverInputDStream<String, String> messages = +// KafkaUtils.createStream(jssc, zkQuorum, groupId, Collections.singletonMap(topicName, numThreads)); +// +// //Convert JSON string to TimelineMetrics. +// JavaDStream<TimelineMetrics> timelineMetricsStream = messages.map(new Function<Tuple2<String, String>, TimelineMetrics>() { +// @Override +// public TimelineMetrics call(Tuple2<String, String> message) throws Exception { +// ObjectMapper mapper = new ObjectMapper(); +// TimelineMetrics metrics = mapper.readValue(message._2, TimelineMetrics.class); +// return metrics; +// } +// }); +// +// timelineMetricsStream.print(); +// +// //Group TimelineMetric by AppId. +// JavaPairDStream<String, TimelineMetrics> appMetricStream = timelineMetricsStream.mapToPair( +// timelineMetrics -> timelineMetrics.getMetrics().isEmpty() ? new Tuple2<>("TEST", new TimelineMetrics()) : new Tuple2<String, TimelineMetrics>(timelineMetrics.getMetrics().get(0).getAppId(), timelineMetrics) +// ); +// +// appMetricStream.print(); +// +// //Filter AppIds that are not needed. +// JavaPairDStream<String, TimelineMetrics> filteredAppMetricStream = appMetricStream.filter(new Function<Tuple2<String, TimelineMetrics>, Boolean>() { +// @Override +// public Boolean call(Tuple2<String, TimelineMetrics> appMetricTuple) throws Exception { +// return appIds.contains(appMetricTuple._1); +// } +// }); +// +// filteredAppMetricStream.print(); +// +// filteredAppMetricStream.foreachRDD(rdd -> { +// rdd.foreach( +// tuple2 -> { +// long currentTime = System.currentTimeMillis(); +// EmaTechnique ema = emaTechniqueBroadcast.getValue(); +// if (currentTime > pitStartTime + pitTestInterval) { +// LOG.info("Running Tukeys...."); +// pointInTimeADSystemBroadcast.getValue().runTukeysAndRefineEma(ema, currentTime); +// pitStartTime = pitStartTime + pitTestInterval; +// } +// +// if (currentTime > ksStartTime + ksTestInterval) { +// LOG.info("Running KS Test...."); +// trendADSystemBroadcast.getValue().runKSTest(currentTime, trendMetrics); +// ksStartTime = ksStartTime + ksTestInterval; +// } +// +// if (currentTime > hdevStartTime + hsdevInterval) { +// LOG.info("Running HSdev Test...."); +// trendADSystemBroadcast.getValue().runHsdevMethod(); +// hdevStartTime = hdevStartTime + hsdevInterval; +// } +// +// TimelineMetrics metrics = tuple2._2(); +// for (TimelineMetric timelineMetric : metrics.getMetrics()) { +// +// boolean includeHost = includedHostBroadcast.getValue().contains(timelineMetric.getHostName()); +// boolean includeMetric = false; +// if (includeHost) { +// if (includePatternBroadcast.getValue().isEmpty()) { +// includeMetric = true; +// } +// for (Pattern p : includePatternBroadcast.getValue()) { +// Matcher m = p.matcher(timelineMetric.getMetricName()); +// if (m.find()) { +// includeMetric = true; +// } +// } +// } +// +// if (includeMetric) { +// trendMetrics.add(new TrendMetric(timelineMetric.getMetricName(), timelineMetric.getAppId(), +// timelineMetric.getHostName())); +// List<MetricAnomaly> anomalies = ema.test(timelineMetric); +// metricsCollectorInterfaceBroadcast.getValue().publish(anomalies); +// } +// } +// }); +// }); +// +// jssc.start(); +// jssc.awaitTermination(); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala index 2d0dbdf..aae216a 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala @@ -22,8 +22,7 @@ import javax.ws.rs.container.{ContainerRequestFilter, ContainerResponseFilter} import org.apache.ambari.metrics.adservice.app.GuiceInjector.{withInjector, wrap} import org.apache.ambari.metrics.adservice.db.{AdAnomalyStoreAccessor, MetadataDatasource} -import org.apache.ambari.metrics.adservice.metadata.MetricDefinitionService -import org.apache.ambari.metrics.adservice.service.ADQueryService +import org.apache.ambari.metrics.adservice.service.{ADQueryService, MetricDefinitionService, DetectionService} import org.glassfish.jersey.filter.LoggingFilter import com.codahale.metrics.health.HealthCheck @@ -53,6 +52,9 @@ class AnomalyDetectionApp extends Application[AnomalyDetectionAppConfig] { injector.getInstance(classOf[MetadataDatasource]).initialize injector.getInstance(classOf[MetricDefinitionService]).initialize injector.getInstance(classOf[ADQueryService]).initialize + injector.getInstance(classOf[DetectionService]).initialize + + env.lifecycle().manage(injector.getInstance(classOf[DetectionService])) } env.jersey.register(jacksonJaxbJsonProvider) env.jersey.register(new LoggingFilter) http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala index 58efa97..fc6293d 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala @@ -20,7 +20,7 @@ package org.apache.ambari.metrics.adservice.app import javax.validation.Valid -import org.apache.ambari.metrics.adservice.configuration.{HBaseConfiguration, _} +import org.apache.ambari.metrics.adservice.configuration.{DetectionServiceConfiguration, HBaseConfiguration, _} import com.fasterxml.jackson.annotation.{JsonIgnore, JsonIgnoreProperties, JsonProperty} @@ -59,6 +59,12 @@ class AnomalyDetectionAppConfig extends Configuration { @Valid private val sparkConfiguration = new SparkConfiguration + /** + * Detection Service configurations + */ + @Valid + private val detectionServiceConfiguration = new DetectionServiceConfiguration + /* AMS HBase Conf */ @@ -86,4 +92,7 @@ class AnomalyDetectionAppConfig extends Configuration { @JsonProperty("spark") def getSparkConfiguration: SparkConfiguration = sparkConfiguration + @JsonProperty("detection") + def getDetectionServiceConfiguration: DetectionServiceConfiguration = detectionServiceConfiguration + } http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala index 68e9df9..0dbc557 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala @@ -19,9 +19,8 @@ package org.apache.ambari.metrics.adservice.app import org.apache.ambari.metrics.adservice.db._ import org.apache.ambari.metrics.adservice.leveldb.LevelDBDataSource -import org.apache.ambari.metrics.adservice.metadata.{MetricDefinitionService, MetricDefinitionServiceImpl} -import org.apache.ambari.metrics.adservice.resource.{AnomalyResource, MetricDefinitionResource, RootResource} -import org.apache.ambari.metrics.adservice.service.{ADQueryService, ADQueryServiceImpl} +import org.apache.ambari.metrics.adservice.resource.{AnomalyResource, DetectionResource, MetricDefinitionResource, RootResource} +import org.apache.ambari.metrics.adservice.service._ import com.codahale.metrics.health.HealthCheck import com.google.inject.AbstractModule @@ -37,11 +36,13 @@ class AnomalyDetectionAppModule(config: AnomalyDetectionAppConfig, env: Environm healthCheckBinder.addBinding().to(classOf[DefaultHealthCheck]) bind(classOf[AnomalyResource]) bind(classOf[MetricDefinitionResource]) + bind(classOf[DetectionResource]) bind(classOf[RootResource]) bind(classOf[AdMetadataStoreAccessor]).to(classOf[AdMetadataStoreAccessorImpl]) bind(classOf[ADQueryService]).to(classOf[ADQueryServiceImpl]) bind(classOf[MetricDefinitionService]).to(classOf[MetricDefinitionServiceImpl]) bind(classOf[MetadataDatasource]).to(classOf[LevelDBDataSource]) bind(classOf[AdAnomalyStoreAccessor]).to(classOf[PhoenixAnomalyStoreAccessor]) + bind(classOf[DetectionService]).to(classOf[DetectionServiceImpl]) } } http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/DetectionServiceConfiguration.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/DetectionServiceConfiguration.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/DetectionServiceConfiguration.scala new file mode 100644 index 0000000..d63affd --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/DetectionServiceConfiguration.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.configuration + +import com.fasterxml.jackson.annotation.JsonProperty + +class DetectionServiceConfiguration { + + @JsonProperty("pointInTime") + private val pointInTime: Boolean = true + + @JsonProperty("trend") + private val trend: Boolean = true + + @JsonProperty("kafkaServers") + private val kafkaServers: String = null + + @JsonProperty("kafkaTopic") + private val kafkaTopic: String = "ambari-metrics-ad" + + @JsonProperty("kafkaConsumerGroup") + private val kafkaConsumerGroup: String = "ambari-metrics-ad-group" + + @JsonProperty("ema-w") + private val emaW: String = "0.9" + + @JsonProperty("ema-n") + private val emaN: String = "3" + + def isPointInTimeSubsystemEnabled: Boolean = pointInTime + + def isTrendSubsystemEnabled: Boolean = trend + + def getKafkaServers : String = kafkaServers + + def getKafkaTopic : String = kafkaTopic + + def getKafkaConsumerGroup : String = kafkaConsumerGroup + + def getEmaW : String = emaW + + def getEmaN : String = emaN + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/SparkConfiguration.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/SparkConfiguration.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/SparkConfiguration.scala index 30efdc7..7dbf128 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/SparkConfiguration.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/SparkConfiguration.scala @@ -18,22 +18,29 @@ package org.apache.ambari.metrics.adservice.configuration -import javax.validation.constraints.NotNull - import com.fasterxml.jackson.annotation.JsonProperty class SparkConfiguration { - @NotNull - private var mode: String = _ + @JsonProperty("mode") + private val mode: String = "standalone" + + @JsonProperty("master") + private val master: String = "spark://localhost:7077" + + @JsonProperty("sparkHome") + private val sparkHome: String = "/usr/lib/ambari-metrics-anomaly-detection/spark" + + @JsonProperty("jarfile") + private val jarfile: String = "/usr/lib/ambari-metrics-anomaly-detection/ambari-metrics-anomaly-detection-service.jar" - @NotNull - private var masterHostPort: String = _ - @JsonProperty def getMode: String = mode - @JsonProperty - def getMasterHostPort: String = masterHostPort + def getMaster: String = master + + def getSparkHome: String = sparkHome + + def getJarFile: String = jarfile } http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala index 53e6dee..ef456ae 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala @@ -22,10 +22,11 @@ import java.util.concurrent.TimeUnit.SECONDS import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig import org.apache.ambari.metrics.adservice.configuration.HBaseConfiguration -import org.apache.ambari.metrics.adservice.metadata.{MetricDefinitionService, MetricKey} +import org.apache.ambari.metrics.adservice.metadata.MetricKey import org.apache.ambari.metrics.adservice.model.AnomalyDetectionMethod.AnomalyDetectionMethod import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType import org.apache.ambari.metrics.adservice.model._ +import org.apache.ambari.metrics.adservice.service.MetricDefinitionService import org.apache.hadoop.hbase.util.RetryCounterFactory import org.slf4j.{Logger, LoggerFactory} http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/AdJobManager.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/AdJobManager.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/AdJobManager.scala new file mode 100644 index 0000000..cdf6b97 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/AdJobManager.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.detection + +import org.slf4j.{Logger, LoggerFactory} +import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig +import org.apache.ambari.metrics.adservice.detection.pointintime.PointInTimeSubsystem + +/** + * Class to start Anomaly detection jobs on spark. + */ +class AdJobManager{ + + val LOG : Logger = LoggerFactory.getLogger(classOf[AdJobManager]) + + var config: AnomalyDetectionAppConfig = _ + var sparkApplicationRunner: SparkApplicationRunner = _ + + val configuredSubsystems: scala.collection.mutable.Map[String, Subsystem] = scala.collection.mutable.Map() + var isInitialized : Boolean = false + + def this (config: AnomalyDetectionAppConfig) = { + this () + this.config = config + this.sparkApplicationRunner = new SparkApplicationRunner(config.getSparkConfiguration) + } + + /** + * Initialize subsystems + */ + def initializeSubsystems() : Unit = { + if (config.getDetectionServiceConfiguration.isPointInTimeSubsystemEnabled) { + configuredSubsystems("pointintime") = new PointInTimeSubsystem(config.getDetectionServiceConfiguration, sparkApplicationRunner) + } + } + + + /** + * Start AD jobs. + */ + def startAdJobs() : Unit = { + if (!isInitialized) { + initializeSubsystems() + isInitialized = true + } + + for (subsystem <- configuredSubsystems.values) { + subsystem.start() + } + } + + /** + * Stop AD jobs. + */ + def stopAdJobs() : Unit = { + for (subsystem <- configuredSubsystems.values) { + subsystem.stop() + } + } + + /** + * Get State of all the AD jobs. + * @return + */ + def state() : Map[String, Map[String, String]] = { + val stateMap: scala.collection.mutable.Map[String, Map[String, String]] = scala.collection.mutable.Map() + + for ((subsystemName, subsystem) <- configuredSubsystems) { + stateMap(subsystemName) = subsystem.state + } + stateMap.toMap + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/SparkApplicationRunner.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/SparkApplicationRunner.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/SparkApplicationRunner.scala new file mode 100644 index 0000000..f025409 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/SparkApplicationRunner.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.detection + +import org.apache.ambari.metrics.adservice.configuration.SparkConfiguration +import org.apache.spark.launcher.{SparkAppHandle, SparkLauncher} +import org.apache.spark.launcher.SparkAppHandle.State + +/** + * Class to run Spark jobs given a set of arguments + */ +class SparkApplicationRunner{ + + var config: SparkConfiguration = _ + val env: java.util.HashMap[String, String] = new java.util.HashMap() + env.put("SPARK_PRINT_LAUNCH_COMMAND", "1") + val sparkArgs: Map[String, String] = Map.empty + + def this(config: SparkConfiguration) = { + this() + this.config = config + } + + /** + * Run Spark Job. + * @param appName Name of the application + * @param appClass Application Class + * @param appArgs Command Line args to be submitted to Spark. + * @return Handle to the Spark job. + */ + def runSparkJob(appName: String, + appClass: String, + appArgs: Iterable[String]): SparkAppHandle = { + + val launcher: SparkLauncher = new SparkLauncher(env) + .setAppName(appName) + .setSparkHome(config.getSparkHome) + .setAppResource(config.getJarFile) + .setMainClass(appClass) + .setMaster(config.getMaster) + + for (arg <- appArgs) { + launcher.addAppArgs(arg) + } + + for ((name,value) <- sparkArgs) { + launcher.addSparkArg(name, value) + } + + val handle: SparkAppHandle = launcher.startApplication() + handle + } + + /** + * Helper method to check if a Spark job is running. + * @param handle + * @return + */ + def isRunning(handle: SparkAppHandle): Boolean = { + handle.getState.equals(State.CONNECTED) || handle.getState.equals(State.SUBMITTED) || handle.getState.equals(State.RUNNING) + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/Subsystem.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/Subsystem.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/Subsystem.scala new file mode 100644 index 0000000..056fc61 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/Subsystem.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.detection + +trait Subsystem { + + def start() : Unit + + def stop(): Unit + + def state: Map[String, String] +} http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/pointintime/EmaSparkDriver.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/pointintime/EmaSparkDriver.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/pointintime/EmaSparkDriver.scala new file mode 100644 index 0000000..261f839 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/pointintime/EmaSparkDriver.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.detection.pointintime + +import java.util + +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.spark.SparkConf +import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} +import org.apache.spark.streaming.{Duration, StreamingContext} +import org.slf4j.{Logger, LoggerFactory} + +import com.fasterxml.jackson.databind.ObjectMapper + +//TODO Work in Progress. Will be updated in the next patch. +/** + * EMA Spark streaming driver application. + * Input : + * EMA algorithm input - w & n. + * Kafka brokers + * Kafka Topic and group name + * + */ +object EmaSparkDriver { + +// @Inject +// var metricDefinitionService : MetricDefinitionService = _ + + val LOG : Logger = LoggerFactory.getLogger("EmaSparkDriver") + + def main(args: Array[String]): Unit = { + + val emaW = args(0) + val emaN = args(1) + val kafkaServers = args(2) + val kafkaTopicName = args(3) + val kafkaGroupName = args(4) + + val sparkConf = new SparkConf().setAppName("EmaSparkDriver") + + + //Instantiate Kafka stream reader + val streamingContext = new StreamingContext(sparkConf, Duration(10000)) + + val kafkaParams: java.util.HashMap[String, Object] = new java.util.HashMap[String, Object] + kafkaParams.put("bootstrap.servers", kafkaServers) + kafkaParams.put("key.deserializer", classOf[StringDeserializer]) + kafkaParams.put("value.deserializer", classOf[StringDeserializer]) + kafkaParams.put("group.id", kafkaGroupName) + kafkaParams.put("auto.offset.reset", "latest") + kafkaParams.put("enable.auto.commit", false: java.lang.Boolean) + + val kafkaStream = + KafkaUtils.createDirectStream( + streamingContext, + LocationStrategies.PreferConsistent, + ConsumerStrategies.Subscribe[String, String]( + util.Arrays.asList(kafkaTopicName), + kafkaParams.asInstanceOf[java.util.Map[String, Object]] + ) + ) + + kafkaStream.print() + + var timelineMetricsStream = kafkaStream.map(message => { + val mapper = new ObjectMapper + val metrics = mapper.readValue(message.value, classOf[TimelineMetrics]) + metrics + }) + timelineMetricsStream.print() + + var filteredAppMetricStream = timelineMetricsStream.map(timelineMetrics => { + val filteredMetrics : TimelineMetrics = timelineMetrics +// for (metric : TimelineMetric <- timelineMetrics.getMetrics.asScala) { +// val metricKey : MetricKey = MetricKey( +// metric.getMetricName, +// metric.getAppId, +// metric.getInstanceId, +// metric.getHostName, +// null) +// +// if (metricKeys.value.apply(metricKey)) { +// filteredMetrics.addOrMergeTimelineMetric(metric) +// } +// } + filteredMetrics + }) + filteredAppMetricStream.print() + + filteredAppMetricStream.foreachRDD(rdd => { + rdd.foreach(item => { + val timelineMetrics : TimelineMetrics = item + LOG.info("Received Metric : " + timelineMetrics.getMetrics.get(0).getMetricName) +// for (timelineMetric <- timelineMetrics.getMetrics) { +// var anomalies = emaModel.test(timelineMetric) +// anomalyMetricPublisher.publish(anomalies) +// } + }) + }) + + streamingContext.start() + streamingContext.awaitTermination() + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/pointintime/PointInTimeSubsystem.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/pointintime/PointInTimeSubsystem.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/pointintime/PointInTimeSubsystem.scala new file mode 100644 index 0000000..fa06ab2 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/detection/pointintime/PointInTimeSubsystem.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.detection.pointintime + +import org.apache.ambari.metrics.adservice.configuration.DetectionServiceConfiguration +import org.apache.ambari.metrics.adservice.detection.{SparkApplicationRunner, Subsystem} +import org.apache.spark.launcher.SparkAppHandle +import org.slf4j.{Logger, LoggerFactory} + +import com.google.inject.Singleton + +@Singleton +class PointInTimeSubsystem extends Subsystem{ + + val LOG : Logger = LoggerFactory.getLogger(classOf[PointInTimeSubsystem]) + + val appHandleMap: scala.collection.mutable.Map[String, SparkAppHandle] = scala.collection.mutable.Map() + var applicationRunner: SparkApplicationRunner = _ + var config: DetectionServiceConfiguration = _ + + //EMA Stuff + val emaDriverClass = "org.apache.ambari.metrics.adservice.detection.pointintime.EmaSparkDriver" + val emaAppName = "Ema_Spark_Application" + val emaConfig: scala.collection.mutable.MutableList[String] = scala.collection.mutable.MutableList() + + def this(config: DetectionServiceConfiguration, applicationRunner: SparkApplicationRunner) = { + this + this.applicationRunner = applicationRunner + this.config = config + + //Initialize + initializeConfigs() + } + + override def start(): Unit = { + + LOG.info("Starting Point in Time AD jobs...") + + if (!appHandleMap.contains(emaAppName) || !applicationRunner.isRunning(appHandleMap.apply(emaAppName))) { + LOG.info("Starting " + emaAppName) + + if (config.getKafkaServers == null || config.getKafkaServers.isEmpty) { + LOG.error("Cannot run" + emaAppName + " without Kafka Servers config") + } else { + val args : scala.collection.mutable.MutableList[String] = scala.collection.mutable.MutableList() + val emaHandle: SparkAppHandle = applicationRunner.runSparkJob(emaAppName, emaDriverClass, emaConfig) + appHandleMap(emaAppName) = emaHandle + + Thread.sleep(3000) + + if (applicationRunner.isRunning(emaHandle)) { + LOG.info(emaAppName + " successfully started.") + } else { + LOG.error(emaAppName + " start failed.") + } + } + } else { + LOG.info(emaAppName + " already running. Moving ahead.") + } + + } + + override def stop(): Unit = { + + LOG.info("Stopping Point in Time AD jobs...") + for ((app, handle) <- appHandleMap) { + handle.stop() + } + //Sleep for 3 seconds + Thread.sleep(3000) + + for ((app, handle) <- appHandleMap) { + if (!applicationRunner.isRunning(handle)) { + LOG.info(app + " successfully stopped.") + } + } + } + + override def state: Map[String, String] = { + val stateMap: scala.collection.mutable.Map[String, String] = scala.collection.mutable.Map() + for ((app, handle) <- appHandleMap) { + if (handle == null) { + stateMap(app) = null + } else { + stateMap(app) = handle.getState.toString + } + } + stateMap.toMap + } + + + private def initializeConfigs(): Unit = { + //EMA Configs + emaConfig.+=(config.getEmaW) + emaConfig.+=(config.getEmaN) + emaConfig.+=(config.getKafkaServers) + emaConfig.+=(config.getKafkaTopic) + emaConfig.+=(config.getKafkaConsumerGroup) + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionService.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionService.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionService.scala deleted file mode 100644 index 52ce39e..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionService.scala +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.metrics.adservice.metadata - -import org.apache.ambari.metrics.adservice.service.AbstractADService - -trait MetricDefinitionService extends AbstractADService{ - - /** - * Given a 'UUID', return the metric key associated with it. - * @param uuid UUID - * @return - */ - def getMetricKeyFromUuid(uuid: Array[Byte]) : MetricKey - - /** - * Return all the definitions being tracked. - * @return Map of Metric Source Definition name to Metric Source Definition. - */ - def getDefinitions: List[MetricSourceDefinition] - - /** - * Given a component definition name, return the definition associated with it. - * @param name component definition name - * @return - */ - def getDefinitionByName(name: String) : MetricSourceDefinition - - /** - * Add a new definition. - * @param definition component definition JSON - * @return - */ - def addDefinition(definition: MetricSourceDefinition) : Boolean - - /** - * Update a component definition by name. Only definitions which were added by API can be modified through API. - * @param definition component definition name - * @return - */ - def updateDefinition(definition: MetricSourceDefinition) : Boolean - - /** - * Delete a component definition by name. Only definitions which were added by API can be deleted through API. - * @param name component definition name - * @return - */ - def deleteDefinitionByName(name: String) : Boolean - - /** - * Given an appId, return set of definitions that are tracked for that appId. - * @param appId component definition appId - * @return - */ - def getDefinitionByAppId(appId: String) : List[MetricSourceDefinition] - - /** - * Return the mapping between definition name to set of metric keys. - * @return Map of Metric Source Definition to set of metric keys associated with it. - */ - def getMetricKeys: Map[String, Set[MetricKey]] - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceImpl.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceImpl.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceImpl.scala deleted file mode 100644 index b9b4a7c..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceImpl.scala +++ /dev/null @@ -1,242 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.metrics.adservice.metadata - -import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig -import org.apache.ambari.metrics.adservice.db.AdMetadataStoreAccessor -import org.slf4j.{Logger, LoggerFactory} - -import com.google.inject.{Inject, Singleton} - -@Singleton -class MetricDefinitionServiceImpl extends MetricDefinitionService { - - val LOG : Logger = LoggerFactory.getLogger(classOf[MetricDefinitionServiceImpl]) - - var adMetadataStoreAccessor: AdMetadataStoreAccessor = _ - var configuration: AnomalyDetectionAppConfig = _ - var metricMetadataProvider: MetricMetadataProvider = _ - - val metricSourceDefinitionMap: scala.collection.mutable.Map[String, MetricSourceDefinition] = scala.collection.mutable.Map() - val metricDefinitionMetricKeyMap: scala.collection.mutable.Map[MetricSourceDefinition, Set[MetricKey]] = scala.collection.mutable.Map() - val metricKeys: scala.collection.mutable.Set[MetricKey] = scala.collection.mutable.Set.empty[MetricKey] - - @Inject - def this (anomalyDetectionAppConfig: AnomalyDetectionAppConfig, metadataStoreAccessor: AdMetadataStoreAccessor) = { - this () - adMetadataStoreAccessor = metadataStoreAccessor - configuration = anomalyDetectionAppConfig - } - - @Override - def initialize() : Unit = { - LOG.info("Initializing Metric Definition Service...") - - //Initialize Metric Metadata Provider - metricMetadataProvider = new ADMetadataProvider(configuration.getMetricCollectorConfiguration) - - //Load definitions from metadata store - val definitionsFromStore: List[MetricSourceDefinition] = adMetadataStoreAccessor.getSavedInputDefinitions - for (definition <- definitionsFromStore) { - sanitizeMetricSourceDefinition(definition) - } - - //Load definitions from configs - val definitionsFromConfig: List[MetricSourceDefinition] = getInputDefinitionsFromConfig - for (definition <- definitionsFromConfig) { - sanitizeMetricSourceDefinition(definition) - } - - //Union the 2 sources, with DB taking precedence. - //Save new definition list to DB. - metricSourceDefinitionMap.++=(combineDefinitionSources(definitionsFromConfig, definitionsFromStore)) - - //Reach out to AMS Metadata and get Metric Keys. Pass in MSD and get back Set<MK> - for (definition <- metricSourceDefinitionMap.values) { - val keys: Set[MetricKey] = metricMetadataProvider.getMetricKeysForDefinitions(definition) - metricDefinitionMetricKeyMap(definition) = keys - metricKeys.++=(keys) - } - - LOG.info("Successfully initialized Metric Definition Service.") - } - - def getMetricKeyFromUuid(uuid: Array[Byte]): MetricKey = { - var key: MetricKey = null - for (metricKey <- metricKeys) { - if (metricKey.uuid.sameElements(uuid)) { - key = metricKey - } - } - key - } - - @Override - def getDefinitions: List[MetricSourceDefinition] = { - metricSourceDefinitionMap.values.toList - } - - @Override - def getDefinitionByName(name: String): MetricSourceDefinition = { - if (!metricSourceDefinitionMap.contains(name)) { - LOG.warn("Metric Source Definition with name " + name + " not found") - null - } else { - metricSourceDefinitionMap.apply(name) - } - } - - @Override - def addDefinition(definition: MetricSourceDefinition): Boolean = { - if (metricSourceDefinitionMap.contains(definition.definitionName)) { - LOG.info("Definition with name " + definition.definitionName + " already present.") - return false - } - definition.definitionSource = MetricSourceDefinitionType.API - - val success: Boolean = adMetadataStoreAccessor.saveInputDefinition(definition) - if (success) { - metricSourceDefinitionMap += definition.definitionName -> definition - val keys: Set[MetricKey] = metricMetadataProvider.getMetricKeysForDefinitions(definition) - metricDefinitionMetricKeyMap(definition) = keys - metricKeys.++=(keys) - LOG.info("Successfully created metric source definition : " + definition.definitionName) - } - success - } - - @Override - def updateDefinition(definition: MetricSourceDefinition): Boolean = { - if (!metricSourceDefinitionMap.contains(definition.definitionName)) { - LOG.warn("Metric Source Definition with name " + definition.definitionName + " not found") - return false - } - - if (metricSourceDefinitionMap.apply(definition.definitionName).definitionSource != MetricSourceDefinitionType.API) { - return false - } - definition.definitionSource = MetricSourceDefinitionType.API - - val success: Boolean = adMetadataStoreAccessor.saveInputDefinition(definition) - if (success) { - metricSourceDefinitionMap += definition.definitionName -> definition - val keys: Set[MetricKey] = metricMetadataProvider.getMetricKeysForDefinitions(definition) - metricDefinitionMetricKeyMap(definition) = keys - metricKeys.++=(keys) - LOG.info("Successfully updated metric source definition : " + definition.definitionName) - } - success - } - - @Override - def deleteDefinitionByName(name: String): Boolean = { - if (!metricSourceDefinitionMap.contains(name)) { - LOG.warn("Metric Source Definition with name " + name + " not found") - return false - } - - val definition : MetricSourceDefinition = metricSourceDefinitionMap.apply(name) - if (definition.definitionSource != MetricSourceDefinitionType.API) { - LOG.warn("Cannot delete metric source definition which was not created through API.") - return false - } - - val success: Boolean = adMetadataStoreAccessor.removeInputDefinition(name) - if (success) { - metricSourceDefinitionMap -= definition.definitionName - metricKeys.--=(metricDefinitionMetricKeyMap.apply(definition)) - metricDefinitionMetricKeyMap -= definition - LOG.info("Successfully deleted metric source definition : " + name) - } - success - } - - @Override - def getDefinitionByAppId(appId: String): List[MetricSourceDefinition] = { - - val defList : List[MetricSourceDefinition] = metricSourceDefinitionMap.values.toList - defList.filter(_.appId == appId) - } - - def combineDefinitionSources(configDefinitions: List[MetricSourceDefinition], dbDefinitions: List[MetricSourceDefinition]) - : Map[String, MetricSourceDefinition] = { - - var combinedDefinitionMap: scala.collection.mutable.Map[String, MetricSourceDefinition] = - scala.collection.mutable.Map.empty[String, MetricSourceDefinition] - - for (definitionFromDb <- dbDefinitions) { - combinedDefinitionMap(definitionFromDb.definitionName) = definitionFromDb - } - - for (definition <- configDefinitions) { - if (!dbDefinitions.contains(definition)) { - adMetadataStoreAccessor.saveInputDefinition(definition) - combinedDefinitionMap(definition.definitionName) = definition - } - } - combinedDefinitionMap.toMap - } - - def getInputDefinitionsFromConfig: List[MetricSourceDefinition] = { - val configDirectory = configuration.getMetricDefinitionServiceConfiguration.getInputDefinitionDirectory - InputMetricDefinitionParser.parseInputDefinitionsFromDirectory(configDirectory) - } - - def setAdMetadataStoreAccessor (adMetadataStoreAccessor: AdMetadataStoreAccessor) : Unit = { - this.adMetadataStoreAccessor = adMetadataStoreAccessor - } - - /** - * Look into the Metric Definitions inside a Metric Source definition, and push down source level appId & - * hosts to Metric definition if they do not have an override. - * @param metricSourceDefinition Input Metric Source Definition - */ - def sanitizeMetricSourceDefinition(metricSourceDefinition: MetricSourceDefinition): Unit = { - val sourceLevelAppId: String = metricSourceDefinition.appId - val sourceLevelHostList: List[String] = metricSourceDefinition.hosts - - for (metricDef <- metricSourceDefinition.metricDefinitions.toList) { - if (metricDef.appId == null) { - if (sourceLevelAppId == null || sourceLevelAppId.isEmpty) { - metricDef.makeInvalid() - } else { - metricDef.appId = sourceLevelAppId - } - } - - if (metricDef.isValid && (metricDef.hosts == null || metricDef.hosts.isEmpty)) { - if (sourceLevelHostList != null && sourceLevelHostList.nonEmpty) { - metricDef.hosts = sourceLevelHostList - } - } - } - } - - /** - * Return the mapping between definition name to set of metric keys. - * - * @return Map of Metric Source Definition to set of metric keys associated with it. - */ - override def getMetricKeys: Map[String, Set[MetricKey]] = { - val metricKeyMap: scala.collection.mutable.Map[String, Set[MetricKey]] = scala.collection.mutable.Map() - for (definition <- metricSourceDefinitionMap.values) { - metricKeyMap(definition.definitionName) = metricDefinitionMetricKeyMap.apply(definition) - } - metricKeyMap.toMap - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinition.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinition.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinition.scala index 47b1499..d49ba34 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinition.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinition.scala @@ -22,26 +22,6 @@ import javax.xml.bind.annotation.XmlRootElement import org.apache.ambari.metrics.adservice.metadata.MetricSourceDefinitionType.MetricSourceDefinitionType import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType -/* -{ - "definition-name": "host-memory", - "app-id" : "HOST", - "hosts" : [âc6401.ambari.apache.orgâ], - "metric-definitions" : [ - { - "metric-name": "mem_free", - "metric-description" : "Free memory on a Host.", - "troubleshooting-info" : "Sudden drop / hike in free memory on a host.", - "static-threshold" : 10, - âapp-idâ : âHOSTâ -} ], - - "related-definition-names" : ["host-cpu", âhost-networkâ], - âanomaly-detection-subsystemsâ : [âpoint-in-timeâ, âtrendâ] -} -*/ - - @SerialVersionUID(10001L) @XmlRootElement class MetricSourceDefinition extends Serializable{ http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/DetectionResource.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/DetectionResource.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/DetectionResource.scala new file mode 100644 index 0000000..6a04190 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/DetectionResource.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.resource + +import javax.ws.rs.core.MediaType.APPLICATION_JSON +import javax.ws.rs.core.Response +import javax.ws.rs.{GET, Path, Produces} + +import org.apache.ambari.metrics.adservice.service.DetectionService + +import com.google.inject.Inject + +@Path("/detection") +class DetectionResource { + + @Inject + var detectionService: DetectionService = _ + + @GET + @Produces(Array(APPLICATION_JSON)) + @Path("/state") + def getState: Response = { + Response.ok.entity(detectionService.state()).build() + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala index 442bf46..7c26014 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala @@ -21,7 +21,8 @@ import javax.ws.rs._ import javax.ws.rs.core.MediaType.APPLICATION_JSON import javax.ws.rs.core.Response -import org.apache.ambari.metrics.adservice.metadata.{MetricDefinitionService, MetricKey, MetricSourceDefinition} +import org.apache.ambari.metrics.adservice.metadata.{MetricKey, MetricSourceDefinition} +import org.apache.ambari.metrics.adservice.service.MetricDefinitionService import org.apache.commons.lang.StringUtils import com.google.inject.Inject http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/DetectionService.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/DetectionService.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/DetectionService.scala new file mode 100644 index 0000000..2cdf20d --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/DetectionService.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.service + +import io.dropwizard.lifecycle.Managed + +trait DetectionService extends AbstractADService with Managed{ + + def state() : Map[String, Map[String, String]] + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/DetectionServiceImpl.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/DetectionServiceImpl.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/DetectionServiceImpl.scala new file mode 100644 index 0000000..ba091cc --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/DetectionServiceImpl.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.service + +import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig +import org.apache.ambari.metrics.adservice.detection.AdJobManager +import org.slf4j.{Logger, LoggerFactory} + +import com.google.inject.{Inject, Singleton} + +@Singleton +class DetectionServiceImpl extends DetectionService{ + + val LOG : Logger = LoggerFactory.getLogger(classOf[DetectionServiceImpl]) + + var adJobManager: AdJobManager = _ + var config : AnomalyDetectionAppConfig = _ + + @Inject + def this (anomalyDetectionAppConfig: AnomalyDetectionAppConfig) = { + this () + this.config = anomalyDetectionAppConfig + } + + override def initialize(): Unit = { + this.adJobManager = new AdJobManager(config) + adJobManager.startAdJobs() + } + + override def state(): Map[String, Map[String, String]] = { + adJobManager.state() + } + + override def start(): Unit = { + } + + override def stop(): Unit = { + LOG.info("Stop Detection Service hook invoked. Stopping AD Jobs") + adJobManager.stopAdJobs() + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/MetricDefinitionService.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/MetricDefinitionService.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/MetricDefinitionService.scala new file mode 100644 index 0000000..11db8c7 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/MetricDefinitionService.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.service + +import org.apache.ambari.metrics.adservice.metadata.{MetricKey, MetricSourceDefinition} + +trait MetricDefinitionService extends AbstractADService{ + + /** + * Given a 'UUID', return the metric key associated with it. + * @param uuid UUID + * @return + */ + def getMetricKeyFromUuid(uuid: Array[Byte]) : MetricKey + + /** + * Return all the definitions being tracked. + * @return Map of Metric Source Definition name to Metric Source Definition. + */ + def getDefinitions: List[MetricSourceDefinition] + + /** + * Given a component definition name, return the definition associated with it. + * @param name component definition name + * @return + */ + def getDefinitionByName(name: String) : MetricSourceDefinition + + /** + * Add a new definition. + * @param definition component definition JSON + * @return + */ + def addDefinition(definition: MetricSourceDefinition) : Boolean + + /** + * Update a component definition by name. Only definitions which were added by API can be modified through API. + * @param definition component definition name + * @return + */ + def updateDefinition(definition: MetricSourceDefinition) : Boolean + + /** + * Delete a component definition by name. Only definitions which were added by API can be deleted through API. + * @param name component definition name + * @return + */ + def deleteDefinitionByName(name: String) : Boolean + + /** + * Given an appId, return set of definitions that are tracked for that appId. + * @param appId component definition appId + * @return + */ + def getDefinitionByAppId(appId: String) : List[MetricSourceDefinition] + + /** + * Return the mapping between definition name to set of metric keys. + * @return Map of Metric Source Definition to set of metric keys associated with it. + */ + def getMetricKeys: Map[String, Set[MetricKey]] + + /** + * Return the set of metric keys. + * @return Set of metric keys. + */ + def getMetricKeyList: Set[MetricKey] + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/MetricDefinitionServiceImpl.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/MetricDefinitionServiceImpl.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/MetricDefinitionServiceImpl.scala new file mode 100644 index 0000000..24bd652 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/MetricDefinitionServiceImpl.scala @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.service + +import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig +import org.apache.ambari.metrics.adservice.db.AdMetadataStoreAccessor +import org.apache.ambari.metrics.adservice.metadata._ +import org.slf4j.{Logger, LoggerFactory} + +import com.google.inject.{Inject, Singleton} + +@Singleton +class MetricDefinitionServiceImpl extends MetricDefinitionService { + + val LOG : Logger = LoggerFactory.getLogger(classOf[MetricDefinitionServiceImpl]) + + var adMetadataStoreAccessor: AdMetadataStoreAccessor = _ + var configuration: AnomalyDetectionAppConfig = _ + var metricMetadataProvider: MetricMetadataProvider = _ + + val metricSourceDefinitionMap: scala.collection.mutable.Map[String, MetricSourceDefinition] = scala.collection.mutable.Map() + val metricDefinitionMetricKeyMap: scala.collection.mutable.Map[MetricSourceDefinition, Set[MetricKey]] = scala.collection.mutable.Map() + val metricKeys: scala.collection.mutable.Set[MetricKey] = scala.collection.mutable.Set.empty[MetricKey] + + @Inject + def this (anomalyDetectionAppConfig: AnomalyDetectionAppConfig, metadataStoreAccessor: AdMetadataStoreAccessor) = { + this () + adMetadataStoreAccessor = metadataStoreAccessor + configuration = anomalyDetectionAppConfig + } + + @Override + def initialize() : Unit = { + LOG.info("Initializing Metric Definition Service...") + + //Initialize Metric Metadata Provider + metricMetadataProvider = new ADMetadataProvider(configuration.getMetricCollectorConfiguration) + + //Load definitions from metadata store + val definitionsFromStore: List[MetricSourceDefinition] = adMetadataStoreAccessor.getSavedInputDefinitions + for (definition <- definitionsFromStore) { + sanitizeMetricSourceDefinition(definition) + } + + //Load definitions from configs + val definitionsFromConfig: List[MetricSourceDefinition] = getInputDefinitionsFromConfig + for (definition <- definitionsFromConfig) { + sanitizeMetricSourceDefinition(definition) + } + + //Union the 2 sources, with DB taking precedence. + //Save new definition list to DB. + metricSourceDefinitionMap.++=(combineDefinitionSources(definitionsFromConfig, definitionsFromStore)) + + //Reach out to AMS Metadata and get Metric Keys. Pass in MSD and get back Set<MK> + for (definition <- metricSourceDefinitionMap.values) { + val keys: Set[MetricKey] = metricMetadataProvider.getMetricKeysForDefinitions(definition) + metricDefinitionMetricKeyMap(definition) = keys + metricKeys.++=(keys) + } + + LOG.info("Successfully initialized Metric Definition Service.") + } + + def getMetricKeyFromUuid(uuid: Array[Byte]): MetricKey = { + var key: MetricKey = null + for (metricKey <- metricKeys) { + if (metricKey.uuid.sameElements(uuid)) { + key = metricKey + } + } + key + } + + @Override + def getDefinitions: List[MetricSourceDefinition] = { + metricSourceDefinitionMap.values.toList + } + + @Override + def getDefinitionByName(name: String): MetricSourceDefinition = { + if (!metricSourceDefinitionMap.contains(name)) { + LOG.warn("Metric Source Definition with name " + name + " not found") + null + } else { + metricSourceDefinitionMap.apply(name) + } + } + + @Override + def addDefinition(definition: MetricSourceDefinition): Boolean = { + if (metricSourceDefinitionMap.contains(definition.definitionName)) { + LOG.info("Definition with name " + definition.definitionName + " already present.") + return false + } + definition.definitionSource = MetricSourceDefinitionType.API + + val success: Boolean = adMetadataStoreAccessor.saveInputDefinition(definition) + if (success) { + metricSourceDefinitionMap += definition.definitionName -> definition + val keys: Set[MetricKey] = metricMetadataProvider.getMetricKeysForDefinitions(definition) + metricDefinitionMetricKeyMap(definition) = keys + metricKeys.++=(keys) + LOG.info("Successfully created metric source definition : " + definition.definitionName) + } + success + } + + @Override + def updateDefinition(definition: MetricSourceDefinition): Boolean = { + if (!metricSourceDefinitionMap.contains(definition.definitionName)) { + LOG.warn("Metric Source Definition with name " + definition.definitionName + " not found") + return false + } + + if (metricSourceDefinitionMap.apply(definition.definitionName).definitionSource != MetricSourceDefinitionType.API) { + return false + } + definition.definitionSource = MetricSourceDefinitionType.API + + val success: Boolean = adMetadataStoreAccessor.saveInputDefinition(definition) + if (success) { + metricSourceDefinitionMap += definition.definitionName -> definition + val keys: Set[MetricKey] = metricMetadataProvider.getMetricKeysForDefinitions(definition) + metricDefinitionMetricKeyMap(definition) = keys + metricKeys.++=(keys) + LOG.info("Successfully updated metric source definition : " + definition.definitionName) + } + success + } + + @Override + def deleteDefinitionByName(name: String): Boolean = { + if (!metricSourceDefinitionMap.contains(name)) { + LOG.warn("Metric Source Definition with name " + name + " not found") + return false + } + + val definition : MetricSourceDefinition = metricSourceDefinitionMap.apply(name) + if (definition.definitionSource != MetricSourceDefinitionType.API) { + LOG.warn("Cannot delete metric source definition which was not created through API.") + return false + } + + val success: Boolean = adMetadataStoreAccessor.removeInputDefinition(name) + if (success) { + metricSourceDefinitionMap -= definition.definitionName + metricKeys.--=(metricDefinitionMetricKeyMap.apply(definition)) + metricDefinitionMetricKeyMap -= definition + LOG.info("Successfully deleted metric source definition : " + name) + } + success + } + + @Override + def getDefinitionByAppId(appId: String): List[MetricSourceDefinition] = { + + val defList : List[MetricSourceDefinition] = metricSourceDefinitionMap.values.toList + defList.filter(_.appId == appId) + } + + def combineDefinitionSources(configDefinitions: List[MetricSourceDefinition], dbDefinitions: List[MetricSourceDefinition]) + : Map[String, MetricSourceDefinition] = { + + var combinedDefinitionMap: scala.collection.mutable.Map[String, MetricSourceDefinition] = + scala.collection.mutable.Map.empty[String, MetricSourceDefinition] + + for (definitionFromDb <- dbDefinitions) { + combinedDefinitionMap(definitionFromDb.definitionName) = definitionFromDb + } + + for (definition <- configDefinitions) { + if (!dbDefinitions.contains(definition)) { + adMetadataStoreAccessor.saveInputDefinition(definition) + combinedDefinitionMap(definition.definitionName) = definition + } + } + combinedDefinitionMap.toMap + } + + def getInputDefinitionsFromConfig: List[MetricSourceDefinition] = { + val configDirectory = configuration.getMetricDefinitionServiceConfiguration.getInputDefinitionDirectory + InputMetricDefinitionParser.parseInputDefinitionsFromDirectory(configDirectory) + } + + def setAdMetadataStoreAccessor (adMetadataStoreAccessor: AdMetadataStoreAccessor) : Unit = { + this.adMetadataStoreAccessor = adMetadataStoreAccessor + } + + /** + * Look into the Metric Definitions inside a Metric Source definition, and push down source level appId & + * hosts to Metric definition if they do not have an override. + * @param metricSourceDefinition Input Metric Source Definition + */ + def sanitizeMetricSourceDefinition(metricSourceDefinition: MetricSourceDefinition): Unit = { + val sourceLevelAppId: String = metricSourceDefinition.appId + val sourceLevelHostList: List[String] = metricSourceDefinition.hosts + + for (metricDef <- metricSourceDefinition.metricDefinitions.toList) { + if (metricDef.appId == null) { + if (sourceLevelAppId == null || sourceLevelAppId.isEmpty) { + metricDef.makeInvalid() + } else { + metricDef.appId = sourceLevelAppId + } + } + + if (metricDef.isValid && (metricDef.hosts == null || metricDef.hosts.isEmpty)) { + if (sourceLevelHostList != null && sourceLevelHostList.nonEmpty) { + metricDef.hosts = sourceLevelHostList + } + } + } + } + + /** + * Return the mapping between definition name to set of metric keys. + * + * @return Map of Metric Source Definition to set of metric keys associated with it. + */ + override def getMetricKeys: Map[String, Set[MetricKey]] = { + val metricKeyMap: scala.collection.mutable.Map[String, Set[MetricKey]] = scala.collection.mutable.Map() + for (definition <- metricSourceDefinitionMap.values) { + metricKeyMap(definition.definitionName) = metricDefinitionMetricKeyMap.apply(definition) + } + metricKeyMap.toMap + } + + /** + * Return the set of metric keys. + * + * @return Set of metric keys. + */ + override def getMetricKeyList: Set[MetricKey] = { + metricKeys.toSet + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/resources/config.yaml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/resources/config.yaml b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/resources/config.yaml index 6b09499..b28078b 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/resources/config.yaml +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/resources/config.yaml @@ -32,4 +32,14 @@ metricDefinitionDB: spark: mode: standalone - masterHostPort: localhost:7077 \ No newline at end of file + master: spark://localhost:7077 + sparkHome: /usr/lib/ambari-metrics-anomaly-detection/spark + jarfile: /usr/lib/ambari-metrics-anomaly-detection/ambari-metrics-anomaly-detection-service.jar + +detection: + trend: false + kafkaServers: localhost:6667 + kafkaTopic: ambari-metrics-ad + kafkaConsumerGroup: ambari-metrics-ad-group + ema-w: 0.9 + ema-n: 3 http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfigTest.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfigTest.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfigTest.scala index 76391a0..fb6998f 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfigTest.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfigTest.scala @@ -60,7 +60,10 @@ class AnomalyDetectionAppConfigTest extends FunSuite { assert(!config.getMetricDefinitionDBConfiguration.getPerformParanoidChecks) assert(config.getSparkConfiguration.getMode.equals("standalone")) - assert(config.getSparkConfiguration.getMasterHostPort.equals("localhost:7077")) + assert(config.getSparkConfiguration.getMaster.equals("spark://localhost:7077")) + + assert(config.getDetectionServiceConfiguration.isPointInTimeSubsystemEnabled) + assert(!config.getDetectionServiceConfiguration.isTrendSubsystemEnabled) } http://git-wip-us.apache.org/repos/asf/ambari/blob/2129b396/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceTest.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceTest.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceTest.scala index d3454f2..f1e128b 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceTest.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceTest.scala @@ -20,6 +20,7 @@ package org.apache.ambari.metrics.adservice.metadata import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig import org.apache.ambari.metrics.adservice.db.AdMetadataStoreAccessor +import org.apache.ambari.metrics.adservice.service.MetricDefinitionServiceImpl import org.easymock.EasyMock.{anyObject, expect, expectLastCall, replay} import org.scalatest.FunSuite import org.scalatest.easymock.EasyMockSugar