Joal has uploaded a new change for review. (
https://gerrit.wikimedia.org/r/392700 )
Change subject: Grow RestbaseMetrics spark job to count MW API
......................................................................
Grow RestbaseMetrics spark job to count MW API
RESTBaseMetrics is renamed to APIsVarnishRequests, and counts
webrequests for both '/api/rest_v1%' and '/w/api.php%'.
The GraphiteClient has also been updated to facilitate sending
multiple messages at once.
Bug: T176785
Change-Id: I3040ab901d7fe28a3a5e3b924cb8c142f31e3a3e
---
A
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/APIsVarnishRequests.scala
D
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/RESTBaseMetrics.scala
M
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/connectors/GraphiteClient.scala
3 files changed, 151 insertions(+), 121 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source
refs/changes/00/392700/1
diff --git
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/APIsVarnishRequests.scala
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/APIsVarnishRequests.scala
new file mode 100644
index 0000000..fd92ba6
--- /dev/null
+++
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/APIsVarnishRequests.scala
@@ -0,0 +1,125 @@
+package org.wikimedia.analytics.refinery.job
+
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.{SparkConf, SparkContext}
+import org.joda.time.DateTime
+import org.wikimedia.analytics.refinery.job.connectors.{GraphiteMessage,
GraphiteClient}
+import scopt.OptionParser
+
+/**
+ * Reports metrics for API (rest and action) to graphite
+ *
+ * Usage with spark-submit:
+ * spark-submit \
+ * --class org.wikimedia.analytics.refinery.job.APIsVarnishRequests
+ * /path/to/refinery-job.jar
+ * -y <year> -m <month> -d <day> -h <hour>
+ * [-r <restbaseNamespace> -a <mediawikiApiNamesapce> -w
<webrequest-base-path> -g <graphite-host> -p <graphite-port>]
+ */
+object APIsVarnishRequests {
+
+
+ /**
+ * Config class for CLI argument parser using scopt
+ */
+ case class Params(webrequestBasePath: String =
"hdfs://analytics-hadoop/wmf/data/wmf/webrequest",
+ graphiteHost: String = "localhost",
+ graphitePort: Int = 2003,
+ restbaseNamespace: String = "restbase.requests",
+ mwAPINamespace: String = "MediaWiki.api",
+ year: Int = 0, month: Int = 0, day: Int = 0, hour: Int = 0)
+
+ /**
+ * Define the command line options parser
+ */
+ val argsParser = new OptionParser[Params]("APIs Varnish Requests") {
+ head("APIs Varnish Requests", "")
+ note("This job reports RESTBase and MW_API traffic to graphite hourly")
+ help("help") text ("Prints this usage text")
+
+ opt[String]('w', "webrequest-base-path") optional() valueName ("<path>")
action { (x, p) =>
+ p.copy(webrequestBasePath = if (x.endsWith("/")) x.dropRight(1) else x)
+ } text ("Base path to webrequest data on hadoop. Defaults to
hdfs://analytics-hadoop/wmf/data/wmf/webrequest")
+
+ opt[String]('g', "graphite-host") optional() valueName ("<path>") action {
(x, p) =>
+ p.copy(graphiteHost = x)
+ } text ("Graphite host. Defaults to localhost")
+
+ opt[Int]('p', "graphite-port") optional() valueName ("<path>") action {
(x, p) =>
+ p.copy(graphitePort = x)
+ } text ("Graphite port. Defaults to 2003")
+
+ opt[String]('r', "restbaseNamespace") optional() valueName ("<path>")
action { (x, p) =>
+ p.copy(restbaseNamespace = x)
+ } text ("Restbase Namespace/prefix for graphite metric. Defaults to
restbase.requests")
+
+ opt[String]('a', "mwAPINamespace") optional() valueName ("<path>") action
{ (x, p) =>
+ p.copy(mwAPINamespace = x)
+ } text ("Mediawiki Namespace/prefix for graphite metric. Defaults to
restbase.requests")
+
+
+ opt[Int]('y', "year") required() action { (x, p) =>
+ p.copy(year = x)
+ } text ("Year as an integer")
+
+ opt[Int]('m', "month") required() action { (x, p) =>
+ p.copy(month = x)
+ } validate { x => if (x > 0 & x <= 12) success else failure("Invalid
month")
+ } text ("Month as an integer")
+
+ opt[Int]('d', "day") required() action { (x, p) =>
+ p.copy(day = x)
+ } validate { x => if (x > 0 & x <= 31) success else failure("Invalid day")
+ } text ("Day of month as an integer")
+
+ opt[Int]('h', "hour") required() action { (x, p) =>
+ p.copy(hour = x)
+ } validate { x => if (x >= 0 & x < 24) success else failure("Invalid hour")
+ } text ("Hour of day as an integer (0-23)")
+
+ }
+
+ def countAPIsURIs(parquetData: DataFrame, sqlContext: SQLContext): (Long,
Long) = {
+
+ parquetData.registerTempTable("tmp_apis")
+ val row = sqlContext.sql(
+ """
+ |SELECT
+ | SUM(CASE WHEN uri_path like '/api/rest_v1%' THEN 1 ELSE 0 END) as
restbase_requests,
+ | SUM(CASE WHEN uri_path like '/w/api.php%' THEN 1 ELSE 0 END) as
mwapi_requests
+ |FROM tmp_apis
+ """.stripMargin).collect().head
+ (row.getLong(0), row.getLong(1))
+ }
+
+ def main(args: Array[String]): Unit = {
+ argsParser.parse(args, Params()) match {
+ case Some(params) => {
+ // Initial Spark setup
+ val conf = new SparkConf().setAppName("APIsVarnishRequests")
+ val sc = new SparkContext(conf)
+ val sqlContext = new SQLContext(sc)
+ sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")
+
+ // Define the path to load data in Parquet format
+ val parquetDataPath =
"%s/webrequest_source=text/year=%d/month=%d/day=%d/hour=%d"
+ .format(params.webrequestBasePath, params.year, params.month,
params.day, params.hour)
+
+ // Define time, metric, Compute request count
+ val graphiteTimestamp = new DateTime(params.year, params.month,
params.day, params.hour, 0).getMillis / 1000
+ val restbaseMetric =
"%s.varnish_requests".format(params.restbaseNamespace)
+ val mwAPIMetric = "%s.varnish_requests".format(params.mwAPINamespace)
+ val (restbaseRequests, mwAPIRequests) =
countAPIsURIs(sqlContext.read.parquet(parquetDataPath), sqlContext)
+
+ // Send to graphite
+ val graphite = new GraphiteClient(params.graphiteHost,
params.graphitePort)
+ graphite.sendMany(Seq(
+ GraphiteMessage(restbaseMetric, restbaseRequests, graphiteTimestamp),
+ GraphiteMessage(mwAPIMetric, mwAPIRequests, graphiteTimestamp)
+ ))
+ }
+ case None => sys.exit(1)
+ }
+ }
+
+}
diff --git
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/RESTBaseMetrics.scala
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/RESTBaseMetrics.scala
deleted file mode 100644
index 9cd743a..0000000
---
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/RESTBaseMetrics.scala
+++ /dev/null
@@ -1,106 +0,0 @@
-package org.wikimedia.analytics.refinery.job
-
-import org.apache.spark.sql.{DataFrame, SQLContext}
-import org.apache.spark.{SparkConf, SparkContext}
-import org.joda.time.DateTime
-import org.wikimedia.analytics.refinery.job.connectors.GraphiteClient
-import scopt.OptionParser
-
-/**
- * Reports metrics for Restbase to graphite
- *
- * Usage with spark-submit:
- * spark-submit \
- * --class org.wikimedia.analytics.refinery.job.RESTBaseMetrics
- * /path/to/refinery-job.jar
- * -y <year> -m <month> -d <day> -h <hour>
- * [-n <namespace> -w <webrequest-base-path> -g <graphite-host> -p
<graphite-port>]
- */
-object RESTBaseMetrics {
-
-
- /**
- * Config class for CLI argument parser using scopt
- */
- case class Params(webrequestBasePath: String =
"hdfs://analytics-hadoop/wmf/data/wmf/webrequest",
- graphiteHost: String = "localhost",
- graphitePort: Int = 2003,
- namespace: String = "restbase.requests",
- year: Int = 0, month: Int = 0, day: Int = 0, hour: Int = 0)
-
- /**
- * Define the command line options parser
- */
- val argsParser = new OptionParser[Params]("RESTBase Metrics") {
- head("RESTBase Metrics", "")
- note("This job reports RESTBase traffic to graphite hourly")
- help("help") text ("Prints this usage text")
-
- opt[String]('w', "webrequest-base-path") optional() valueName ("<path>")
action { (x, p) =>
- p.copy(webrequestBasePath = if (x.endsWith("/")) x.dropRight(1) else x)
- } text ("Base path to webrequest data on hadoop. Defaults to
hdfs://analytics-hadoop/wmf/data/wmf/webrequest")
-
- opt[String]('g', "graphite-host") optional() valueName ("<path>") action {
(x, p) =>
- p.copy(graphiteHost = x)
- } text ("Graphite host. Defaults to localhost")
-
- opt[Int]('p', "graphite-port") optional() valueName ("<path>") action {
(x, p) =>
- p.copy(graphitePort = x)
- } text ("Graphite port. Defaults to 2003")
-
- opt[String]('n', "namespace") optional() valueName ("<path>") action { (x,
p) =>
- p.copy(namespace = x)
- } text ("Namespace/prefix for graphite metric. Defaults to
restbase.requests")
-
- opt[Int]('y', "year") required() action { (x, p) =>
- p.copy(year = x)
- } text ("Year as an integer")
-
- opt[Int]('m', "month") required() action { (x, p) =>
- p.copy(month = x)
- } validate { x => if (x > 0 & x <= 12) success else failure("Invalid
month")
- } text ("Month as an integer")
-
- opt[Int]('d', "day") required() action { (x, p) =>
- p.copy(day = x)
- } validate { x => if (x > 0 & x <= 31) success else failure("Invalid day")
- } text ("Day of month as an integer")
-
- opt[Int]('h', "hour") required() action { (x, p) =>
- p.copy(hour = x)
- } validate { x => if (x >= 0 & x < 24) success else failure("Invalid hour")
- } text ("Hour of day as an integer (0-23)")
-
- }
-
- def countRESTBaseURIs(parquetData: DataFrame): Long = {
- parquetData.filter("uri_path like '%/api/rest_v1%'").count
- }
-
- def main(args: Array[String]): Unit = {
- argsParser.parse(args, Params()) match {
- case Some(params) => {
- // Initial Spark setup
- val conf = new SparkConf().setAppName("RESTBaseMetrics")
- val sc = new SparkContext(conf)
- val sqlContext = new SQLContext(sc)
- sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")
-
- // Define the path to load data in Parquet format
- val parquetDataPath =
"%s/webrequest_source=text/year=%d/month=%d/day=%d/hour=%d"
- .format(params.webrequestBasePath, params.year, params.month,
params.day, params.hour)
-
- // Define time, metric, Compute request count
- val time = new DateTime(params.year, params.month, params.day,
params.hour, 0)
- val metric = "%s.varnish_requests".format(params.namespace)
- val requestCount =
countRESTBaseURIs(sqlContext.parquetFile(parquetDataPath))
-
- // Send to graphite
- val graphite = new GraphiteClient(params.graphiteHost,
params.graphitePort)
- graphite.sendOnce(metric, requestCount, time.getMillis / 1000)
- }
- case None => sys.exit(1)
- }
- }
-
-}
diff --git
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/connectors/GraphiteClient.scala
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/connectors/GraphiteClient.scala
index b92ece1..c47ee98 100644
---
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/connectors/GraphiteClient.scala
+++
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/connectors/GraphiteClient.scala
@@ -6,6 +6,17 @@
import org.joda.time.DateTimeUtils
/**
+ * Helper to create a message string following Carbon's plaintext protocol
+ *
+ * @param metric Name of the graphite metric, e.g foo.bar
+ * @param value Value of the metric
+ * @param timestamp Timestamp in seconds, defaults to current time
+ */
+case class GraphiteMessage(metric: String, value: Long, timestamp:Long =
DateTimeUtils.currentTimeMillis() / 1000) {
+ override def toString = "%s %d %d\n".format(metric, value, timestamp)
+}
+
+/**
* Simple GraphiteClient in Scala
* Creates a Socket and writes to it,
* based on the plaintext protocol supported by Carbon
@@ -44,10 +55,12 @@
}
def close() = {
- out.close
- socket.close
+ out.close()
+ socket.close()
}
}
+
+
/**
* Create an instance of Connection
@@ -60,18 +73,6 @@
}
/**
- * Helper to create a message string following Carbon's plaintext protocol
- *
- * @param metric Name of the graphite metric, e.g foo.bar
- * @param value Value of the metric
- * @param timestamp Timestamp in seconds, defaults to current time
- * @return Formatted string for plaintext protocol
- */
- def message(metric:String, value:Long, timestamp:Long =
DateTimeUtils.currentTimeMillis() / 1000) = {
- "%s %d %d\n".format(metric, value, timestamp)
- }
-
- /**
* Helper that opens a connection, sends a message, and closes connection
* @param metric Name of the graphite metric, e.g foo.bar
* @param value Value of the metric
@@ -79,7 +80,17 @@
*/
def sendOnce(metric:String, value:Long, timestamp:Long) = {
val conn = connection()
- conn.write(message(metric, value, timestamp))
+ conn.write(new GraphiteMessage(metric, value, timestamp).toString)
+ conn.close()
+ }
+
+ /**
+ * Helper that opens a connection, sends a list of messages, and closes
connection
+ * @param messages List of messages to be sent
+ */
+ def sendMany(messages: Seq[GraphiteMessage]) = {
+ val conn = connection()
+ messages.foreach(message => conn.write(message.toString))
conn.close()
}
--
To view, visit https://gerrit.wikimedia.org/r/392700
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I3040ab901d7fe28a3a5e3b924cb8c142f31e3a3e
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery/source
Gerrit-Branch: master
Gerrit-Owner: Joal <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits