Repository: ambari Updated Branches: refs/heads/branch-3.0-ams 4613b471e -> a9c6054fe
AMBARI-22163 : Anomaly Storage: Design Metric anomalies schema. (avijayan) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/a9c6054f Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/a9c6054f Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/a9c6054f Branch: refs/heads/branch-3.0-ams Commit: a9c6054fe3c2512f8021f3bb4fb9150e40238c5b Parents: 4613b47 Author: Aravindan Vijayan <avija...@hortonworks.com> Authored: Fri Oct 6 10:53:28 2017 -0700 Committer: Aravindan Vijayan <avija...@hortonworks.com> Committed: Fri Oct 6 10:53:28 2017 -0700 ---------------------------------------------------------------------- ambari-logsearch/ambari-logsearch-it/pom.xml | 2 +- .../pom.xml | 33 +++++- .../common/ADServiceConfiguration.scala | 74 +++++++++++++ .../common/PhoenixQueryConstants.scala | 109 +++++++++++++++++++ .../db/PhoenixAnomalyStoreAccessor.scala | 67 ++++++++++++ .../spark/prototype/SparkPhoenixReader.scala | 92 ++++++++-------- .../common/ADManagerConfigurationTest.scala | 23 ++++ .../db/PhoenixAnomalyStoreAccessorTest.scala | 26 +++++ ambari-metrics/ambari-metrics-common/pom.xml | 46 ++++++++ .../sink/timeline/query/ConnectionProvider.java | 32 ++++++ .../query/DefaultPhoenixDataSource.java | 108 ++++++++++++++++++ .../query/PhoenixConnectionProvider.java | 31 ++++++ .../metrics/timeline/PhoenixHBaseAccessor.java | 23 +--- .../timeline/query/ConnectionProvider.java | 29 ----- .../query/DefaultPhoenixDataSource.java | 90 --------------- .../query/PhoenixConnectionProvider.java | 31 ------ .../TestApplicationHistoryServer.java | 2 +- .../timeline/AbstractMiniHBaseClusterTest.java | 6 +- .../timeline/PhoenixHBaseAccessorTest.java | 4 +- 19 files changed, 601 insertions(+), 227 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-logsearch/ambari-logsearch-it/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-it/pom.xml b/ambari-logsearch/ambari-logsearch-it/pom.xml index db3e09f..b3a1d45 100644 --- a/ambari-logsearch/ambari-logsearch-it/pom.xml +++ b/ambari-logsearch/ambari-logsearch-it/pom.xml @@ -122,7 +122,7 @@ </dependencies> <build> - <testOutputDirectory>target/classes</testOutputDirectory> + <testOutputDirectory>test/target/classes</testOutputDirectory> <testResources> <testResource> <directory>src/test/java/</directory> http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml b/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml index 1a10f86..6f8f8c1 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml @@ -29,8 +29,9 @@ <artifactId>ambari-metrics-anomaly-detection-service</artifactId> <version>2.0.0.0-SNAPSHOT</version> <properties> - <scala.version>2.10.4</scala.version> + <scala.version>2.11.1</scala.version> <scala.binary.version>2.11</scala.binary.version> + <hadoop.version>2.7.3.2.6.0.3-8</hadoop.version> </properties> <repositories> @@ -201,5 +202,35 @@ <version>2.1.1</version> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>commons-el</groupId> + <artifactId>commons-el</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-runtime</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-compiler</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jsp-2.1-jetty</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_2.11</artifactId> + <version>3.0.1</version> + <scope>test</scope> + </dependency> </dependencies> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/ADServiceConfiguration.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/ADServiceConfiguration.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/ADServiceConfiguration.scala new file mode 100644 index 0000000..248c74e --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/ADServiceConfiguration.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.common + +import java.net.{MalformedURLException, URISyntaxException} + +import org.apache.hadoop.conf.Configuration + +object ADServiceConfiguration { + + private val AMS_AD_SITE_CONFIGURATION_FILE = "ams-ad-site.xml" + private val HBASE_SITE_CONFIGURATION_FILE = "hbase-site.xml" + + val ANOMALY_METRICS_TTL = "timeline.metrics.anomaly.data.ttl" + + private var hbaseConf: org.apache.hadoop.conf.Configuration = _ + private var adConf: org.apache.hadoop.conf.Configuration = _ + + def initConfigs(): Unit = { + + var classLoader: ClassLoader = Thread.currentThread.getContextClassLoader + if (classLoader == null) classLoader = getClass.getClassLoader + + try { + val hbaseResUrl = classLoader.getResource(HBASE_SITE_CONFIGURATION_FILE) + if (hbaseResUrl == null) throw new IllegalStateException("Unable to initialize the AD subsystem. No hbase-site present in the classpath.") + + hbaseConf = new Configuration(true) + hbaseConf.addResource(hbaseResUrl.toURI.toURL) + + val adSystemConfigUrl = classLoader.getResource(AMS_AD_SITE_CONFIGURATION_FILE) + if (adSystemConfigUrl == null) throw new IllegalStateException("Unable to initialize the AD subsystem. No ams-ad-site present in the classpath") + + adConf = new Configuration(true) + adConf.addResource(adSystemConfigUrl.toURI.toURL) + + } catch { + case me : MalformedURLException => println("MalformedURLException") + case ue : URISyntaxException => println("URISyntaxException") + } + } + + def getHBaseConf: org.apache.hadoop.conf.Configuration = { + hbaseConf + } + + def getAdConf: org.apache.hadoop.conf.Configuration = { + adConf + } + + def getAnomalyDataTtl: Int = { + if (adConf != null) return adConf.get(ANOMALY_METRICS_TTL, "604800").toInt + 604800 + } + + /** + * ttl + * + */ +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/PhoenixQueryConstants.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/PhoenixQueryConstants.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/PhoenixQueryConstants.scala new file mode 100644 index 0000000..5e90d2b --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/PhoenixQueryConstants.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.common + +object PhoenixQueryConstants { + + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + /* Table Name constants */ + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + val METRIC_PROFILE_TABLE_NAME = "METRIC_PROFILE" + val METHOD_PARAMETERS_TABLE_NAME = "METHOD_PARAMETERS" + val PIT_ANOMALY_METRICS_TABLE_NAME = "PIT_METRIC_ANOMALIES" + val TREND_ANOMALY_METRICS_TABLE_NAME = "TREND_METRIC_ANOMALIES" + val MODEL_SNAPSHOT = "MODEL_SNAPSHOT" + + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + /* CREATE statement constants */ + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + val CREATE_METRIC_PROFILE_TABLE = "" + + val CREATE_METHOD_PARAMETERS_TABLE: String = "CREATE TABLE IF NOT EXISTS %s (" + + "METHOD_NAME VARCHAR, " + + "METHOD_TYPE VARCHAR, " + + "PARAMETERS VARCHAR " + + "CONSTRAINT pk PRIMARY KEY (METHOD_NAME)) " + + "DATA_BLOCK_ENCODING='FAST_DIFF', IMMUTABLE_ROWS=true, COMPRESSION='SNAPPY'" + + val CREATE_PIT_ANOMALY_METRICS_TABLE_SQL: String = "CREATE TABLE IF NOT EXISTS %s (" + + "METRIC_UUID BINARY(20) NOT NULL, " + + "METHOD_NAME VARCHAR, " + + "ANOMALY_TIMESTAMP UNSIGNED_LONG NOT NULL, " + + "METRIC_VALUE DOUBLE, " + + "SEASONAL_INFO VARCHAR, " + + "ANOMALY_SCORE DOUBLE, " + + "MODEL_SNAPSHOT VARCHAR, " + + "DETECTION_TIME UNSIGNED_LONG " + + "CONSTRAINT pk PRIMARY KEY (METRIC_UUID, METHOD_NAME, ANOMALY_TIMESTAMP)) " + + "DATA_BLOCK_ENCODING='FAST_DIFF', IMMUTABLE_ROWS=true, TTL=%s, COMPRESSION='SNAPPY'" + + val CREATE_TREND_ANOMALY_METRICS_TABLE_SQL: String = "CREATE TABLE IF NOT EXISTS %s (" + + "METRIC_UUID BINARY(20) NOT NULL, " + + "ANOMALY_PERIOD_START UNSIGNED_LONG NOT NULL, " + + "ANOMALY_PERIOD_END UNSIGNED_LONG NOT NULL, " + + "TEST_PERIOD_START UNSIGNED_LONG NOT NULL, " + + "TEST_PERIOD_END UNSIGNED_LONG NOT NULL, " + + "METHOD_NAME VARCHAR, " + + "ANOMALY_SCORE DOUBLE, " + + "MODEL_SNAPSHOT VARCHAR, " + + "DETECTION_TIME UNSIGNED_LONG " + + "CONSTRAINT pk PRIMARY KEY (METRIC_UUID, METHOD_NAME, ANOMALY_PERIOD_START, ANOMALY_PERIOD_END, TEST_PERIOD_START, TEST_PERIOD_END)) " + + "DATA_BLOCK_ENCODING='FAST_DIFF' IMMUTABLE_ROWS=true, TTL=%s, COMPRESSION='SNAPPY'" + + val CREATE_MODEL_SNAPSHOT_TABLE: String = "CREATE TABLE IF NOT EXISTS %s (" + + "METRIC_UUID BINARY(20), " + + "METHOD_NAME VARCHAR, " + + "METHOD_TYPE VARCHAR, " + + "PARAMETERS VARCHAR " + + "SNAPSHOT_TIME UNSIGNED LONG NOT NULL " + "CONSTRAINT pk PRIMARY KEY (METRIC_UUID, METHOD_NAME)) " + + "DATA_BLOCK_ENCODING='FAST_DIFF', IMMUTABLE_ROWS=true, COMPRESSION='SNAPPY'" + + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + /* UPSERT statement constants */ + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + val UPSERT_METHOD_PARAMETERS_SQL: String = "UPSERT INTO %s (METHOD_NAME, METHOD_TYPE, PARAMETERS) VALUES (?,?,?)" + + val UPSERT_PIT_ANOMALY_METRICS_SQL: String = "UPSERT INTO %s (METRIC_UUID, ANOMALY_TIMESTAMP, METRIC_VALUE, METHOD_NAME, " + + "SEASONAL_INFO, ANOMALY_SCORE, MODEL_SNAPSHOT, DETECTION_TIME) VALUES (?, ?, ?, ?, ?, ?, ?, ?)" + + val UPSERT_TREND_ANOMALY_METRICS_SQL: String = "UPSERT INTO %s (METRIC_UUID, ANOMALY_PERIOD_START, ANOMALY_PERIOD_END, " + + "TEST_PERIOD_START, TEST_PERIOD_END, METHOD_NAME, ANOMALY_SCORE, MODEL_SNAPSHOT, DETECTION_TIME) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" + + val UPSERT_MODEL_SNAPSHOT_SQL: String = "UPSERT INTO %s (METRIC_UUID, METHOD_NAME, METHOD_TYPE, PARAMETERS) VALUES (?, ?, ?, ?)" + + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + /* GET statement constants */ + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + val GET_METHOD_PAREMETERS_SQL: String = "SELECT METHOD_NAME, METHOD_TYPE, PARAMETERS FROM %s WHERE METHOD_NAME = %s" + + val GET_PIT_ANOMALY_METRIC_SQL: String = "SELECT METRIC_UUID, ANOMALY_TIMESTAMP, METRIC_VALUE, METHOD_NAME, SEASONAL_INFO, " + + "ANOMALY_SCORE, MODEL_SNAPSHOT, DETECTION_TIME FROM %s WHERE METRIC_METRIC_UUID = ? AND ANOMALY_TIMESTAMP > ? AND ANOMALY_TIMESTAMP <= ? " + + "ORDER BY ANOMALY_SCORE DESC" + + val GET_TREND_ANOMALY_METRIC_SQL: String = "SELECT METRIC_METRIC_UUID, ANOMALY_PERIOD_START, ANOMALY_PERIOD_END, TEST_PERIOD_START, " + + "ANOMALY_PERIOD_START, METHOD_NAME, ANOMALY_SCORE, MODEL_SNAPSHOT, DETECTION_TIME FROM %s WHERE METHOD = ? AND ANOMALY_PERIOD_END > ? " + + "AND TEST_END_TIME <= ? ORDER BY ANOMALY_SCORE DESC" + + val GET_MODEL_SNAPSHOT_SQL: String = "SELECT METRIC_UUID, METHOD_NAME, METHOD_TYPE, PARAMETERS FROM %s WHERE UUID = %s AND METHOD_NAME = %s" + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala new file mode 100644 index 0000000..6f33e56 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.db + +import java.sql.{Connection, SQLException} + +import org.apache.ambari.metrics.adservice.common.{ADServiceConfiguration, PhoenixQueryConstants} +import org.apache.hadoop.hbase.util.RetryCounterFactory +import org.apache.hadoop.metrics2.sink.timeline.query.{DefaultPhoenixDataSource, PhoenixConnectionProvider} +import java.util.concurrent.TimeUnit.SECONDS + +object PhoenixAnomalyStoreAccessor { + + private var datasource: PhoenixConnectionProvider = _ + + def initAnomalyMetricSchema(): Unit = { + + val datasource: PhoenixConnectionProvider = new DefaultPhoenixDataSource(ADServiceConfiguration.getHBaseConf) + val retryCounterFactory = new RetryCounterFactory(10, SECONDS.toMillis(3).toInt) + + val ttl = ADServiceConfiguration.getAnomalyDataTtl + try { + var conn = datasource.getConnectionRetryingOnException(retryCounterFactory) + var stmt = conn.createStatement + + val methodParametersSql = String.format(PhoenixQueryConstants.CREATE_METHOD_PARAMETERS_TABLE, + PhoenixQueryConstants.METHOD_PARAMETERS_TABLE_NAME) + stmt.executeUpdate(methodParametersSql) + + val pointInTimeAnomalySql = String.format(PhoenixQueryConstants.CREATE_PIT_ANOMALY_METRICS_TABLE_SQL, + PhoenixQueryConstants.PIT_ANOMALY_METRICS_TABLE_NAME, + ttl.asInstanceOf[Object]) + stmt.executeUpdate(pointInTimeAnomalySql) + + val trendAnomalySql = String.format(PhoenixQueryConstants.CREATE_TREND_ANOMALY_METRICS_TABLE_SQL, + PhoenixQueryConstants.TREND_ANOMALY_METRICS_TABLE_NAME, + ttl.asInstanceOf[Object]) + stmt.executeUpdate(trendAnomalySql) + + val snapshotSql = String.format(PhoenixQueryConstants.CREATE_MODEL_SNAPSHOT_TABLE, + PhoenixQueryConstants.MODEL_SNAPSHOT) + stmt.executeUpdate(snapshotSql) + + conn.commit() + } catch { + case e: SQLException => throw e + } + } + + @throws[SQLException] + def getConnection: Connection = datasource.getConnection +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/SparkPhoenixReader.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/SparkPhoenixReader.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/SparkPhoenixReader.scala index 6e1ae07..ac00764 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/SparkPhoenixReader.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/SparkPhoenixReader.scala @@ -26,52 +26,52 @@ object SparkPhoenixReader { def main(args: Array[String]) { - if (args.length < 6) { - System.err.println("Usage: SparkPhoenixReader <metric_name> <appId> <hostname> <weight> <timessdev> <phoenixConnectionString> <model_dir>") - System.exit(1) - } - - var metricName = args(0) - var appId = args(1) - var hostname = args(2) - var weight = args(3).toDouble - var timessdev = args(4).toInt - var phoenixConnectionString = args(5) //avijayan-ams-3.openstacklocal:61181:/ams-hbase-unsecure - var modelDir = args(6) - - val conf = new SparkConf() - conf.set("spark.app.name", "AMSAnomalyModelBuilder") - //conf.set("spark.master", "spark://avijayan-ams-2.openstacklocal:7077") - - var sc = new SparkContext(conf) - val sqlContext = new SQLContext(sc) - - val currentTime = System.currentTimeMillis() - val oneDayBack = currentTime - 24*60*60*1000 - - val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "METRIC_RECORD", "zkUrl" -> phoenixConnectionString)) - df.registerTempTable("METRIC_RECORD") - val result = sqlContext.sql("SELECT METRIC_NAME, HOSTNAME, APP_ID, SERVER_TIME, METRIC_SUM, METRIC_COUNT FROM METRIC_RECORD " + - "WHERE METRIC_NAME = '" + metricName + "' AND HOSTNAME = '" + hostname + "' AND APP_ID = '" + appId + "' AND SERVER_TIME < " + currentTime + " AND SERVER_TIME > " + oneDayBack) - - var metricValues = new java.util.TreeMap[java.lang.Long, java.lang.Double] - result.collect().foreach( - t => metricValues.put(t.getLong(3), t.getDouble(4) / t.getInt(5)) - ) - - //val seriesName = result.head().getString(0) - //val hostname = result.head().getString(1) - //val appId = result.head().getString(2) - - val timelineMetric = new TimelineMetric() - timelineMetric.setMetricName(metricName) - timelineMetric.setAppId(appId) - timelineMetric.setHostName(hostname) - timelineMetric.setMetricValues(metricValues) - - var emaModel = new EmaTechnique(weight, timessdev) - emaModel.test(timelineMetric) - emaModel.save(sc, modelDir) +// if (args.length < 6) { +// System.err.println("Usage: SparkPhoenixReader <metric_name> <appId> <hostname> <weight> <timessdev> <phoenixConnectionString> <model_dir>") +// System.exit(1) +// } +// +// var metricName = args(0) +// var appId = args(1) +// var hostname = args(2) +// var weight = args(3).toDouble +// var timessdev = args(4).toInt +// var phoenixConnectionString = args(5) //avijayan-ams-3.openstacklocal:61181:/ams-hbase-unsecure +// var modelDir = args(6) +// +// val conf = new SparkConf() +// conf.set("spark.app.name", "AMSAnomalyModelBuilder") +// //conf.set("spark.master", "spark://avijayan-ams-2.openstacklocal:7077") +// +// var sc = new SparkContext(conf) +// val sqlContext = new SQLContext(sc) +// +// val currentTime = System.currentTimeMillis() +// val oneDayBack = currentTime - 24*60*60*1000 +// +// val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "METRIC_RECORD", "zkUrl" -> phoenixConnectionString)) +// df.registerTempTable("METRIC_RECORD") +// val result = sqlContext.sql("SELECT METRIC_NAME, HOSTNAME, APP_ID, SERVER_TIME, METRIC_SUM, METRIC_COUNT FROM METRIC_RECORD " + +// "WHERE METRIC_NAME = '" + metricName + "' AND HOSTNAME = '" + hostname + "' AND APP_ID = '" + appId + "' AND SERVER_TIME < " + currentTime + " AND SERVER_TIME > " + oneDayBack) +// +// var metricValues = new java.util.TreeMap[java.lang.Long, java.lang.Double] +// result.collect().foreach( +// t => metricValues.put(t.getLong(3), t.getDouble(4) / t.getInt(5)) +// ) +// +// //val seriesName = result.head().getString(0) +// //val hostname = result.head().getString(1) +// //val appId = result.head().getString(2) +// +// val timelineMetric = new TimelineMetric() +// timelineMetric.setMetricName(metricName) +// timelineMetric.setAppId(appId) +// timelineMetric.setHostName(hostname) +// timelineMetric.setMetricValues(metricValues) +// +// var emaModel = new EmaTechnique(weight, timessdev) +// emaModel.test(timelineMetric) +// emaModel.save(sc, modelDir) } http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/common/ADManagerConfigurationTest.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/common/ADManagerConfigurationTest.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/common/ADManagerConfigurationTest.scala new file mode 100644 index 0000000..535dc9e --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/common/ADManagerConfigurationTest.scala @@ -0,0 +1,23 @@ +package org.apache.ambari.metrics.adservice.common + +import org.scalatest.FlatSpec + +import scala.collection.mutable + +class ADServiceConfigurationTest extends FlatSpec { + + "A Stack" should "pop values in last-in-first-out order" in { + val stack = new mutable.Stack[Int] + stack.push(1) + stack.push(2) + assert(stack.pop() === 2) + assert(stack.pop() === 1) + } + + it should "throw NoSuchElementException if an empty stack is popped" in { + val emptyStack = new mutable.Stack[String] + assertThrows[NoSuchElementException] { + emptyStack.pop() + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessorTest.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessorTest.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessorTest.scala new file mode 100644 index 0000000..142e98a --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessorTest.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.db + +import org.scalatest.FunSuite + +class PhoenixAnomalyStoreAccessorTest extends FunSuite { + + test("testInitAnomalyMetricSchema") { + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-metrics/ambari-metrics-common/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/pom.xml b/ambari-metrics/ambari-metrics-common/pom.xml index 1a7fef3..e868557 100644 --- a/ambari-metrics/ambari-metrics-common/pom.xml +++ b/ambari-metrics/ambari-metrics-common/pom.xml @@ -26,6 +26,13 @@ <modelVersion>4.0.0</modelVersion> <artifactId>ambari-metrics-common</artifactId> <name>Ambari Metrics Common</name> + + <properties> + <hadoop.version>2.7.3.2.6.0.3-8</hadoop.version> + <hbase.version>1.1.2.2.6.0.3-8</hbase.version> + <phoenix.version>4.7.0.2.6.0.3-8</phoenix.version> + </properties> + <build> <plugins> <plugin> @@ -126,6 +133,45 @@ <dependencies> <dependency> + <groupId>org.apache.phoenix</groupId> + <artifactId>phoenix-core</artifactId> + <version>${phoenix.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-annotations</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>commons-el</groupId> + <artifactId>commons-el</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-runtime</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-compiler</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jsp-2.1-jetty</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> <groupId>net.sf.ehcache</groupId> <artifactId>ehcache</artifactId> <version>2.10.0</version> http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/ConnectionProvider.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/ConnectionProvider.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/ConnectionProvider.java new file mode 100644 index 0000000..72e5fb5 --- /dev/null +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/ConnectionProvider.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.timeline.query; + + +import org.apache.hadoop.hbase.util.RetryCounterFactory; + +import java.sql.Connection; +import java.sql.SQLException; + +/** + * + */ +public interface ConnectionProvider { + public Connection getConnection() throws SQLException; + public Connection getConnectionRetryingOnException(RetryCounterFactory retryCounterFactory) throws SQLException, InterruptedException; +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/DefaultPhoenixDataSource.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/DefaultPhoenixDataSource.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/DefaultPhoenixDataSource.java new file mode 100644 index 0000000..a28a433 --- /dev/null +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/DefaultPhoenixDataSource.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.timeline.query; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.hadoop.hbase.util.RetryCounterFactory; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; + +public class DefaultPhoenixDataSource implements PhoenixConnectionProvider { + + static final Log LOG = LogFactory.getLog(DefaultPhoenixDataSource.class); + private static final String ZOOKEEPER_CLIENT_PORT = "hbase.zookeeper.property.clientPort"; + private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"; + private static final String ZNODE_PARENT = "zookeeper.znode.parent"; + + private static final String connectionUrl = "jdbc:phoenix:%s:%s:%s"; + private final String url; + + private Configuration hbaseConf; + + public DefaultPhoenixDataSource(Configuration hbaseConf) { + this.hbaseConf = hbaseConf; + String zookeeperClientPort = hbaseConf.getTrimmed(ZOOKEEPER_CLIENT_PORT, "2181"); + String zookeeperQuorum = hbaseConf.getTrimmed(ZOOKEEPER_QUORUM); + String znodeParent = hbaseConf.getTrimmed(ZNODE_PARENT, "/ams-hbase-unsecure"); + if (zookeeperQuorum == null || zookeeperQuorum.isEmpty()) { + throw new IllegalStateException("Unable to find Zookeeper quorum to " + + "access HBase store using Phoenix."); + } + + url = String.format(connectionUrl, + zookeeperQuorum, + zookeeperClientPort, + znodeParent); + } + + /** + * Get HBaseAdmin for table ops. + * @return @HBaseAdmin + * @throws IOException + */ + public HBaseAdmin getHBaseAdmin() throws IOException { + return (HBaseAdmin) ConnectionFactory.createConnection(hbaseConf).getAdmin(); + } + + /** + * Get JDBC connection to HBase store. Assumption is that the hbase + * configuration is present on the classpath and loaded by the caller into + * the Configuration object. + * Phoenix already caches the HConnection between the client and HBase + * cluster. + * + * @return @java.sql.Connection + */ + public Connection getConnection() throws SQLException { + + LOG.debug("Metric store connection url: " + url); + try { + return DriverManager.getConnection(url); + } catch (SQLException e) { + LOG.warn("Unable to connect to HBase store using Phoenix.", e); + + throw e; + } + } + + public Connection getConnectionRetryingOnException(RetryCounterFactory retryCounterFactory) + throws SQLException, InterruptedException { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try{ + return getConnection(); + } catch (SQLException e) { + if(!retryCounter.shouldRetry()){ + LOG.error("HBaseAccessor getConnection failed after " + + retryCounter.getMaxAttempts() + " attempts"); + throw e; + } + } + retryCounter.sleepUntilNextRetry(); + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/PhoenixConnectionProvider.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/PhoenixConnectionProvider.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/PhoenixConnectionProvider.java new file mode 100644 index 0000000..194c769 --- /dev/null +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/PhoenixConnectionProvider.java @@ -0,0 +1,31 @@ +package org.apache.hadoop.metrics2.sink.timeline.query; + +import org.apache.hadoop.hbase.client.HBaseAdmin; + +import java.io.IOException; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public interface PhoenixConnectionProvider extends ConnectionProvider { + /** + * Get HBaseAdmin for the Phoenix connection + * @return + * @throws IOException + */ + HBaseAdmin getHBaseAdmin() throws IOException; +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java index 685e638..e218691 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java @@ -140,8 +140,8 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultPhoenixDataSource; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixConnectionProvider; +import org.apache.hadoop.metrics2.sink.timeline.query.DefaultPhoenixDataSource; +import org.apache.hadoop.metrics2.sink.timeline.query.PhoenixConnectionProvider; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.SplitByMetricNamesCondition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink.ExternalMetricsSink; @@ -458,23 +458,6 @@ public class PhoenixHBaseAccessor { return mapper.readValue(json, metricValuesTypeRef); } - private Connection getConnectionRetryingOnException() - throws SQLException, InterruptedException { - RetryCounter retryCounter = retryCounterFactory.create(); - while (true) { - try{ - return getConnection(); - } catch (SQLException e) { - if(!retryCounter.shouldRetry()){ - LOG.error("HBaseAccessor getConnection failed after " - + retryCounter.getMaxAttempts() + " attempts"); - throw e; - } - } - retryCounter.sleepUntilNextRetry(); - } - } - /** * Get JDBC connection to HBase store. Assumption is that the hbase * configuration is present on the classpath and loaded by the caller into @@ -507,7 +490,7 @@ public class PhoenixHBaseAccessor { try { LOG.info("Initializing metrics schema..."); - conn = getConnectionRetryingOnException(); + conn = dataSource.getConnectionRetryingOnException(retryCounterFactory); stmt = conn.createStatement(); // Metadata http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java deleted file mode 100644 index 24239a0..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query; - - -import java.sql.Connection; -import java.sql.SQLException; - -/** - * - */ -public interface ConnectionProvider { - public Connection getConnection() throws SQLException; -} http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java deleted file mode 100644 index c5761f7..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query; - - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.HBaseAdmin; - -import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; - -public class DefaultPhoenixDataSource implements PhoenixConnectionProvider { - - static final Log LOG = LogFactory.getLog(DefaultPhoenixDataSource.class); - private static final String ZOOKEEPER_CLIENT_PORT = "hbase.zookeeper.property.clientPort"; - private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"; - private static final String ZNODE_PARENT = "zookeeper.znode.parent"; - - private static final String connectionUrl = "jdbc:phoenix:%s:%s:%s"; - private final String url; - - private Configuration hbaseConf; - - public DefaultPhoenixDataSource(Configuration hbaseConf) { - this.hbaseConf = hbaseConf; - String zookeeperClientPort = hbaseConf.getTrimmed(ZOOKEEPER_CLIENT_PORT, "2181"); - String zookeeperQuorum = hbaseConf.getTrimmed(ZOOKEEPER_QUORUM); - String znodeParent = hbaseConf.getTrimmed(ZNODE_PARENT, "/ams-hbase-unsecure"); - if (zookeeperQuorum == null || zookeeperQuorum.isEmpty()) { - throw new IllegalStateException("Unable to find Zookeeper quorum to " + - "access HBase store using Phoenix."); - } - - url = String.format(connectionUrl, - zookeeperQuorum, - zookeeperClientPort, - znodeParent); - } - - /** - * Get HBaseAdmin for table ops. - * @return @HBaseAdmin - * @throws IOException - */ - public HBaseAdmin getHBaseAdmin() throws IOException { - return (HBaseAdmin) ConnectionFactory.createConnection(hbaseConf).getAdmin(); - } - - /** - * Get JDBC connection to HBase store. Assumption is that the hbase - * configuration is present on the classpath and loaded by the caller into - * the Configuration object. - * Phoenix already caches the HConnection between the client and HBase - * cluster. - * - * @return @java.sql.Connection - */ - public Connection getConnection() throws SQLException { - - LOG.debug("Metric store connection url: " + url); - try { - return DriverManager.getConnection(url); - } catch (SQLException e) { - LOG.warn("Unable to connect to HBase store using Phoenix.", e); - - throw e; - } - } - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java deleted file mode 100644 index cacbcfb..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java +++ /dev/null @@ -1,31 +0,0 @@ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query; - -import org.apache.hadoop.hbase.client.HBaseAdmin; - -import java.io.IOException; - -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -public interface PhoenixConnectionProvider extends ConnectionProvider { - /** - * Get HBaseAdmin for the Phoenix connection - * @return - * @throws IOException - */ - HBaseAdmin getHBaseAdmin() throws IOException; -} http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java index 03205e7..7b70a80 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java @@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultPhoenixDataSource; +import org.apache.hadoop.metrics2.sink.timeline.query.DefaultPhoenixDataSource; import org.apache.zookeeper.ClientCnxn; import org.easymock.EasyMock; import org.junit.After; http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java index 3a42db9..40691d6 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java @@ -22,13 +22,9 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.assertj.core.api.Assertions.assertThat; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.replay; import static org.powermock.api.easymock.PowerMock.mockStatic; -import static org.powermock.api.easymock.PowerMock.replayAll; import java.io.IOException; -import java.lang.reflect.Field; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; @@ -48,7 +44,7 @@ import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixConnectionProvider; +import org.apache.hadoop.metrics2.sink.timeline.query.PhoenixConnectionProvider; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.log4j.Level; import org.apache.log4j.Logger; http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java index 7be3c0d..97d2512 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java @@ -32,19 +32,17 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixConnectionProvider; +import org.apache.hadoop.metrics2.sink.timeline.query.PhoenixConnectionProvider; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; import org.apache.phoenix.exception.PhoenixIOException; import org.easymock.EasyMock; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.powermock.api.easymock.PowerMock; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import java.io.IOException; -import java.lang.reflect.Field; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet;