Joal has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/392700 )
Change subject: Grow RestbaseMetrics spark job to count MW API ...................................................................... Grow RestbaseMetrics spark job to count MW API RESTBaseMetrics is renamed to APIsVarnishRequests, and counts webrequests for both '/api/rest_v1%' and '/w/api.php%'. The GraphiteClient has also been updated to facilitate sending multiple messages at once. Bug: T176785 Change-Id: I3040ab901d7fe28a3a5e3b924cb8c142f31e3a3e --- A refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/APIsVarnishRequests.scala D refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/RESTBaseMetrics.scala M refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/connectors/GraphiteClient.scala 3 files changed, 151 insertions(+), 121 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source refs/changes/00/392700/1 diff --git a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/APIsVarnishRequests.scala b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/APIsVarnishRequests.scala new file mode 100644 index 0000000..fd92ba6 --- /dev/null +++ b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/APIsVarnishRequests.scala @@ -0,0 +1,125 @@ +package org.wikimedia.analytics.refinery.job + +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.{SparkConf, SparkContext} +import org.joda.time.DateTime +import org.wikimedia.analytics.refinery.job.connectors.{GraphiteMessage, GraphiteClient} +import scopt.OptionParser + +/** + * Reports metrics for API (rest and action) to graphite + * + * Usage with spark-submit: + * spark-submit \ + * --class org.wikimedia.analytics.refinery.job.APIsVarnishRequests + * /path/to/refinery-job.jar + * -y <year> -m <month> -d <day> -h <hour> + * [-r <restbaseNamespace> -a <mediawikiApiNamesapce> -w <webrequest-base-path> -g <graphite-host> -p <graphite-port>] + */ +object APIsVarnishRequests { + + + /** + * Config class for CLI argument parser using scopt + */ + case class Params(webrequestBasePath: String = "hdfs://analytics-hadoop/wmf/data/wmf/webrequest", + graphiteHost: String = "localhost", + graphitePort: Int = 2003, + restbaseNamespace: String = "restbase.requests", + mwAPINamespace: String = "MediaWiki.api", + year: Int = 0, month: Int = 0, day: Int = 0, hour: Int = 0) + + /** + * Define the command line options parser + */ + val argsParser = new OptionParser[Params]("APIs Varnish Requests") { + head("APIs Varnish Requests", "") + note("This job reports RESTBase and MW_API traffic to graphite hourly") + help("help") text ("Prints this usage text") + + opt[String]('w', "webrequest-base-path") optional() valueName ("<path>") action { (x, p) => + p.copy(webrequestBasePath = if (x.endsWith("/")) x.dropRight(1) else x) + } text ("Base path to webrequest data on hadoop. Defaults to hdfs://analytics-hadoop/wmf/data/wmf/webrequest") + + opt[String]('g', "graphite-host") optional() valueName ("<path>") action { (x, p) => + p.copy(graphiteHost = x) + } text ("Graphite host. Defaults to localhost") + + opt[Int]('p', "graphite-port") optional() valueName ("<path>") action { (x, p) => + p.copy(graphitePort = x) + } text ("Graphite port. Defaults to 2003") + + opt[String]('r', "restbaseNamespace") optional() valueName ("<path>") action { (x, p) => + p.copy(restbaseNamespace = x) + } text ("Restbase Namespace/prefix for graphite metric. Defaults to restbase.requests") + + opt[String]('a', "mwAPINamespace") optional() valueName ("<path>") action { (x, p) => + p.copy(mwAPINamespace = x) + } text ("Mediawiki Namespace/prefix for graphite metric. Defaults to restbase.requests") + + + opt[Int]('y', "year") required() action { (x, p) => + p.copy(year = x) + } text ("Year as an integer") + + opt[Int]('m', "month") required() action { (x, p) => + p.copy(month = x) + } validate { x => if (x > 0 & x <= 12) success else failure("Invalid month") + } text ("Month as an integer") + + opt[Int]('d', "day") required() action { (x, p) => + p.copy(day = x) + } validate { x => if (x > 0 & x <= 31) success else failure("Invalid day") + } text ("Day of month as an integer") + + opt[Int]('h', "hour") required() action { (x, p) => + p.copy(hour = x) + } validate { x => if (x >= 0 & x < 24) success else failure("Invalid hour") + } text ("Hour of day as an integer (0-23)") + + } + + def countAPIsURIs(parquetData: DataFrame, sqlContext: SQLContext): (Long, Long) = { + + parquetData.registerTempTable("tmp_apis") + val row = sqlContext.sql( + """ + |SELECT + | SUM(CASE WHEN uri_path like '/api/rest_v1%' THEN 1 ELSE 0 END) as restbase_requests, + | SUM(CASE WHEN uri_path like '/w/api.php%' THEN 1 ELSE 0 END) as mwapi_requests + |FROM tmp_apis + """.stripMargin).collect().head + (row.getLong(0), row.getLong(1)) + } + + def main(args: Array[String]): Unit = { + argsParser.parse(args, Params()) match { + case Some(params) => { + // Initial Spark setup + val conf = new SparkConf().setAppName("APIsVarnishRequests") + val sc = new SparkContext(conf) + val sqlContext = new SQLContext(sc) + sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy") + + // Define the path to load data in Parquet format + val parquetDataPath = "%s/webrequest_source=text/year=%d/month=%d/day=%d/hour=%d" + .format(params.webrequestBasePath, params.year, params.month, params.day, params.hour) + + // Define time, metric, Compute request count + val graphiteTimestamp = new DateTime(params.year, params.month, params.day, params.hour, 0).getMillis / 1000 + val restbaseMetric = "%s.varnish_requests".format(params.restbaseNamespace) + val mwAPIMetric = "%s.varnish_requests".format(params.mwAPINamespace) + val (restbaseRequests, mwAPIRequests) = countAPIsURIs(sqlContext.read.parquet(parquetDataPath), sqlContext) + + // Send to graphite + val graphite = new GraphiteClient(params.graphiteHost, params.graphitePort) + graphite.sendMany(Seq( + GraphiteMessage(restbaseMetric, restbaseRequests, graphiteTimestamp), + GraphiteMessage(mwAPIMetric, mwAPIRequests, graphiteTimestamp) + )) + } + case None => sys.exit(1) + } + } + +} diff --git a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/RESTBaseMetrics.scala b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/RESTBaseMetrics.scala deleted file mode 100644 index 9cd743a..0000000 --- a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/RESTBaseMetrics.scala +++ /dev/null @@ -1,106 +0,0 @@ -package org.wikimedia.analytics.refinery.job - -import org.apache.spark.sql.{DataFrame, SQLContext} -import org.apache.spark.{SparkConf, SparkContext} -import org.joda.time.DateTime -import org.wikimedia.analytics.refinery.job.connectors.GraphiteClient -import scopt.OptionParser - -/** - * Reports metrics for Restbase to graphite - * - * Usage with spark-submit: - * spark-submit \ - * --class org.wikimedia.analytics.refinery.job.RESTBaseMetrics - * /path/to/refinery-job.jar - * -y <year> -m <month> -d <day> -h <hour> - * [-n <namespace> -w <webrequest-base-path> -g <graphite-host> -p <graphite-port>] - */ -object RESTBaseMetrics { - - - /** - * Config class for CLI argument parser using scopt - */ - case class Params(webrequestBasePath: String = "hdfs://analytics-hadoop/wmf/data/wmf/webrequest", - graphiteHost: String = "localhost", - graphitePort: Int = 2003, - namespace: String = "restbase.requests", - year: Int = 0, month: Int = 0, day: Int = 0, hour: Int = 0) - - /** - * Define the command line options parser - */ - val argsParser = new OptionParser[Params]("RESTBase Metrics") { - head("RESTBase Metrics", "") - note("This job reports RESTBase traffic to graphite hourly") - help("help") text ("Prints this usage text") - - opt[String]('w', "webrequest-base-path") optional() valueName ("<path>") action { (x, p) => - p.copy(webrequestBasePath = if (x.endsWith("/")) x.dropRight(1) else x) - } text ("Base path to webrequest data on hadoop. Defaults to hdfs://analytics-hadoop/wmf/data/wmf/webrequest") - - opt[String]('g', "graphite-host") optional() valueName ("<path>") action { (x, p) => - p.copy(graphiteHost = x) - } text ("Graphite host. Defaults to localhost") - - opt[Int]('p', "graphite-port") optional() valueName ("<path>") action { (x, p) => - p.copy(graphitePort = x) - } text ("Graphite port. Defaults to 2003") - - opt[String]('n', "namespace") optional() valueName ("<path>") action { (x, p) => - p.copy(namespace = x) - } text ("Namespace/prefix for graphite metric. Defaults to restbase.requests") - - opt[Int]('y', "year") required() action { (x, p) => - p.copy(year = x) - } text ("Year as an integer") - - opt[Int]('m', "month") required() action { (x, p) => - p.copy(month = x) - } validate { x => if (x > 0 & x <= 12) success else failure("Invalid month") - } text ("Month as an integer") - - opt[Int]('d', "day") required() action { (x, p) => - p.copy(day = x) - } validate { x => if (x > 0 & x <= 31) success else failure("Invalid day") - } text ("Day of month as an integer") - - opt[Int]('h', "hour") required() action { (x, p) => - p.copy(hour = x) - } validate { x => if (x >= 0 & x < 24) success else failure("Invalid hour") - } text ("Hour of day as an integer (0-23)") - - } - - def countRESTBaseURIs(parquetData: DataFrame): Long = { - parquetData.filter("uri_path like '%/api/rest_v1%'").count - } - - def main(args: Array[String]): Unit = { - argsParser.parse(args, Params()) match { - case Some(params) => { - // Initial Spark setup - val conf = new SparkConf().setAppName("RESTBaseMetrics") - val sc = new SparkContext(conf) - val sqlContext = new SQLContext(sc) - sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy") - - // Define the path to load data in Parquet format - val parquetDataPath = "%s/webrequest_source=text/year=%d/month=%d/day=%d/hour=%d" - .format(params.webrequestBasePath, params.year, params.month, params.day, params.hour) - - // Define time, metric, Compute request count - val time = new DateTime(params.year, params.month, params.day, params.hour, 0) - val metric = "%s.varnish_requests".format(params.namespace) - val requestCount = countRESTBaseURIs(sqlContext.parquetFile(parquetDataPath)) - - // Send to graphite - val graphite = new GraphiteClient(params.graphiteHost, params.graphitePort) - graphite.sendOnce(metric, requestCount, time.getMillis / 1000) - } - case None => sys.exit(1) - } - } - -} diff --git a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/connectors/GraphiteClient.scala b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/connectors/GraphiteClient.scala index b92ece1..c47ee98 100644 --- a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/connectors/GraphiteClient.scala +++ b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/connectors/GraphiteClient.scala @@ -6,6 +6,17 @@ import org.joda.time.DateTimeUtils /** + * Helper to create a message string following Carbon's plaintext protocol + * + * @param metric Name of the graphite metric, e.g foo.bar + * @param value Value of the metric + * @param timestamp Timestamp in seconds, defaults to current time + */ +case class GraphiteMessage(metric: String, value: Long, timestamp:Long = DateTimeUtils.currentTimeMillis() / 1000) { + override def toString = "%s %d %d\n".format(metric, value, timestamp) +} + +/** * Simple GraphiteClient in Scala * Creates a Socket and writes to it, * based on the plaintext protocol supported by Carbon @@ -44,10 +55,12 @@ } def close() = { - out.close - socket.close + out.close() + socket.close() } } + + /** * Create an instance of Connection @@ -60,18 +73,6 @@ } /** - * Helper to create a message string following Carbon's plaintext protocol - * - * @param metric Name of the graphite metric, e.g foo.bar - * @param value Value of the metric - * @param timestamp Timestamp in seconds, defaults to current time - * @return Formatted string for plaintext protocol - */ - def message(metric:String, value:Long, timestamp:Long = DateTimeUtils.currentTimeMillis() / 1000) = { - "%s %d %d\n".format(metric, value, timestamp) - } - - /** * Helper that opens a connection, sends a message, and closes connection * @param metric Name of the graphite metric, e.g foo.bar * @param value Value of the metric @@ -79,7 +80,17 @@ */ def sendOnce(metric:String, value:Long, timestamp:Long) = { val conn = connection() - conn.write(message(metric, value, timestamp)) + conn.write(new GraphiteMessage(metric, value, timestamp).toString) + conn.close() + } + + /** + * Helper that opens a connection, sends a list of messages, and closes connection + * @param messages List of messages to be sent + */ + def sendMany(messages: Seq[GraphiteMessage]) = { + val conn = connection() + messages.foreach(message => conn.write(message.toString)) conn.close() } -- To view, visit https://gerrit.wikimedia.org/r/392700 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I3040ab901d7fe28a3a5e3b924cb8c142f31e3a3e Gerrit-PatchSet: 1 Gerrit-Project: analytics/refinery/source Gerrit-Branch: master Gerrit-Owner: Joal <j...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits