Joal has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/392700 )

Change subject: Grow RestbaseMetrics spark job to count MW API
......................................................................

Grow RestbaseMetrics spark job to count MW API

RESTBaseMetrics is renamed to APIsVarnishRequests, and counts
webrequests for both '/api/rest_v1%' and '/w/api.php%'.
The GraphiteClient has also been updated to facilitate sending
multiple messages at once.

Bug: T176785
Change-Id: I3040ab901d7fe28a3a5e3b924cb8c142f31e3a3e
---
A 
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/APIsVarnishRequests.scala
D 
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/RESTBaseMetrics.scala
M 
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/connectors/GraphiteClient.scala
3 files changed, 151 insertions(+), 121 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source 
refs/changes/00/392700/1

diff --git 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/APIsVarnishRequests.scala
 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/APIsVarnishRequests.scala
new file mode 100644
index 0000000..fd92ba6
--- /dev/null
+++ 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/APIsVarnishRequests.scala
@@ -0,0 +1,125 @@
+package org.wikimedia.analytics.refinery.job
+
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.{SparkConf, SparkContext}
+import org.joda.time.DateTime
+import org.wikimedia.analytics.refinery.job.connectors.{GraphiteMessage, 
GraphiteClient}
+import scopt.OptionParser
+
+/**
+ * Reports metrics for API (rest and action) to graphite
+ *
+ * Usage with spark-submit:
+ * spark-submit \
+ * --class org.wikimedia.analytics.refinery.job.APIsVarnishRequests
+ * /path/to/refinery-job.jar
+ * -y <year> -m <month> -d <day> -h <hour>
+ * [-r <restbaseNamespace> -a <mediawikiApiNamesapce> -w 
<webrequest-base-path> -g <graphite-host> -p <graphite-port>]
+ */
+object APIsVarnishRequests {
+
+
+  /**
+   * Config class for CLI argument parser using scopt
+   */
+  case class Params(webrequestBasePath: String = 
"hdfs://analytics-hadoop/wmf/data/wmf/webrequest",
+                    graphiteHost: String = "localhost",
+                    graphitePort: Int = 2003,
+                    restbaseNamespace: String = "restbase.requests",
+                    mwAPINamespace: String = "MediaWiki.api",
+                    year: Int = 0, month: Int = 0, day: Int = 0, hour: Int = 0)
+
+  /**
+   * Define the command line options parser
+   */
+  val argsParser = new OptionParser[Params]("APIs Varnish Requests") {
+    head("APIs Varnish Requests", "")
+    note("This job reports RESTBase and MW_API traffic to graphite hourly")
+    help("help") text ("Prints this usage text")
+
+    opt[String]('w', "webrequest-base-path") optional() valueName ("<path>") 
action { (x, p) =>
+      p.copy(webrequestBasePath = if (x.endsWith("/")) x.dropRight(1) else x)
+    } text ("Base path to webrequest data on hadoop. Defaults to 
hdfs://analytics-hadoop/wmf/data/wmf/webrequest")
+
+    opt[String]('g', "graphite-host") optional() valueName ("<path>") action { 
(x, p) =>
+      p.copy(graphiteHost = x)
+    } text ("Graphite host. Defaults to localhost")
+
+    opt[Int]('p', "graphite-port") optional() valueName ("<path>") action { 
(x, p) =>
+      p.copy(graphitePort = x)
+    } text ("Graphite port. Defaults to 2003")
+
+    opt[String]('r', "restbaseNamespace") optional() valueName ("<path>") 
action { (x, p) =>
+      p.copy(restbaseNamespace = x)
+    } text ("Restbase Namespace/prefix for graphite metric. Defaults to 
restbase.requests")
+
+    opt[String]('a', "mwAPINamespace") optional() valueName ("<path>") action 
{ (x, p) =>
+      p.copy(mwAPINamespace = x)
+    } text ("Mediawiki Namespace/prefix for graphite metric. Defaults to 
restbase.requests")
+
+
+    opt[Int]('y', "year") required() action { (x, p) =>
+      p.copy(year = x)
+    } text ("Year as an integer")
+
+    opt[Int]('m', "month") required() action { (x, p) =>
+      p.copy(month = x)
+    } validate { x => if (x > 0 & x <= 12) success else failure("Invalid 
month")
+    } text ("Month as an integer")
+
+    opt[Int]('d', "day") required() action { (x, p) =>
+      p.copy(day = x)
+    } validate { x => if (x > 0 & x <= 31) success else failure("Invalid day")
+    } text ("Day of month as an integer")
+
+    opt[Int]('h', "hour") required() action { (x, p) =>
+      p.copy(hour = x)
+    } validate { x => if (x >= 0 & x < 24) success else failure("Invalid hour")
+    } text ("Hour of day as an integer (0-23)")
+
+  }
+
+  def countAPIsURIs(parquetData: DataFrame, sqlContext: SQLContext): (Long, 
Long) = {
+
+    parquetData.registerTempTable("tmp_apis")
+    val row = sqlContext.sql(
+      """
+        |SELECT
+        |  SUM(CASE WHEN uri_path like '/api/rest_v1%' THEN 1 ELSE 0 END) as 
restbase_requests,
+        |  SUM(CASE WHEN uri_path like '/w/api.php%' THEN 1 ELSE 0 END) as 
mwapi_requests
+        |FROM tmp_apis
+      """.stripMargin).collect().head
+    (row.getLong(0), row.getLong(1))
+  }
+
+  def main(args: Array[String]): Unit = {
+    argsParser.parse(args, Params()) match {
+      case Some(params) => {
+        // Initial Spark setup
+        val conf = new SparkConf().setAppName("APIsVarnishRequests")
+        val sc = new SparkContext(conf)
+        val sqlContext = new SQLContext(sc)
+        sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")
+
+        // Define the path to load data in Parquet format
+        val parquetDataPath = 
"%s/webrequest_source=text/year=%d/month=%d/day=%d/hour=%d"
+          .format(params.webrequestBasePath, params.year, params.month, 
params.day, params.hour)
+
+        // Define time, metric, Compute request count
+        val graphiteTimestamp = new DateTime(params.year, params.month, 
params.day, params.hour, 0).getMillis / 1000
+        val restbaseMetric = 
"%s.varnish_requests".format(params.restbaseNamespace)
+        val mwAPIMetric = "%s.varnish_requests".format(params.mwAPINamespace)
+        val (restbaseRequests, mwAPIRequests) = 
countAPIsURIs(sqlContext.read.parquet(parquetDataPath), sqlContext)
+
+        // Send to graphite
+        val graphite = new GraphiteClient(params.graphiteHost, 
params.graphitePort)
+        graphite.sendMany(Seq(
+          GraphiteMessage(restbaseMetric, restbaseRequests, graphiteTimestamp),
+          GraphiteMessage(mwAPIMetric, mwAPIRequests, graphiteTimestamp)
+        ))
+      }
+      case None => sys.exit(1)
+    }
+  }
+
+}
diff --git 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/RESTBaseMetrics.scala
 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/RESTBaseMetrics.scala
deleted file mode 100644
index 9cd743a..0000000
--- 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/RESTBaseMetrics.scala
+++ /dev/null
@@ -1,106 +0,0 @@
-package org.wikimedia.analytics.refinery.job
-
-import org.apache.spark.sql.{DataFrame, SQLContext}
-import org.apache.spark.{SparkConf, SparkContext}
-import org.joda.time.DateTime
-import org.wikimedia.analytics.refinery.job.connectors.GraphiteClient
-import scopt.OptionParser
-
-/**
- * Reports metrics for Restbase to graphite
- *
- * Usage with spark-submit:
- * spark-submit \
- * --class org.wikimedia.analytics.refinery.job.RESTBaseMetrics
- * /path/to/refinery-job.jar
- * -y <year> -m <month> -d <day> -h <hour>
- * [-n <namespace> -w <webrequest-base-path> -g <graphite-host> -p 
<graphite-port>]
- */
-object RESTBaseMetrics {
-
-
-  /**
-   * Config class for CLI argument parser using scopt
-   */
-  case class Params(webrequestBasePath: String = 
"hdfs://analytics-hadoop/wmf/data/wmf/webrequest",
-                    graphiteHost: String = "localhost",
-                    graphitePort: Int = 2003,
-                    namespace: String = "restbase.requests",
-                    year: Int = 0, month: Int = 0, day: Int = 0, hour: Int = 0)
-
-  /**
-   * Define the command line options parser
-   */
-  val argsParser = new OptionParser[Params]("RESTBase Metrics") {
-    head("RESTBase Metrics", "")
-    note("This job reports RESTBase traffic to graphite hourly")
-    help("help") text ("Prints this usage text")
-
-    opt[String]('w', "webrequest-base-path") optional() valueName ("<path>") 
action { (x, p) =>
-      p.copy(webrequestBasePath = if (x.endsWith("/")) x.dropRight(1) else x)
-    } text ("Base path to webrequest data on hadoop. Defaults to 
hdfs://analytics-hadoop/wmf/data/wmf/webrequest")
-
-    opt[String]('g', "graphite-host") optional() valueName ("<path>") action { 
(x, p) =>
-      p.copy(graphiteHost = x)
-    } text ("Graphite host. Defaults to localhost")
-
-    opt[Int]('p', "graphite-port") optional() valueName ("<path>") action { 
(x, p) =>
-      p.copy(graphitePort = x)
-    } text ("Graphite port. Defaults to 2003")
-
-    opt[String]('n', "namespace") optional() valueName ("<path>") action { (x, 
p) =>
-      p.copy(namespace = x)
-    } text ("Namespace/prefix for graphite metric. Defaults to 
restbase.requests")
-
-    opt[Int]('y', "year") required() action { (x, p) =>
-      p.copy(year = x)
-    } text ("Year as an integer")
-
-    opt[Int]('m', "month") required() action { (x, p) =>
-      p.copy(month = x)
-    } validate { x => if (x > 0 & x <= 12) success else failure("Invalid 
month")
-    } text ("Month as an integer")
-
-    opt[Int]('d', "day") required() action { (x, p) =>
-      p.copy(day = x)
-    } validate { x => if (x > 0 & x <= 31) success else failure("Invalid day")
-    } text ("Day of month as an integer")
-
-    opt[Int]('h', "hour") required() action { (x, p) =>
-      p.copy(hour = x)
-    } validate { x => if (x >= 0 & x < 24) success else failure("Invalid hour")
-    } text ("Hour of day as an integer (0-23)")
-
-  }
-
-  def countRESTBaseURIs(parquetData: DataFrame): Long = {
-    parquetData.filter("uri_path like '%/api/rest_v1%'").count
-  }
-
-  def main(args: Array[String]): Unit = {
-    argsParser.parse(args, Params()) match {
-      case Some(params) => {
-        // Initial Spark setup
-        val conf = new SparkConf().setAppName("RESTBaseMetrics")
-        val sc = new SparkContext(conf)
-        val sqlContext = new SQLContext(sc)
-        sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")
-
-        // Define the path to load data in Parquet format
-        val parquetDataPath = 
"%s/webrequest_source=text/year=%d/month=%d/day=%d/hour=%d"
-          .format(params.webrequestBasePath, params.year, params.month, 
params.day, params.hour)
-
-        // Define time, metric, Compute request count
-        val time = new DateTime(params.year, params.month, params.day, 
params.hour, 0)
-        val metric = "%s.varnish_requests".format(params.namespace)
-        val requestCount = 
countRESTBaseURIs(sqlContext.parquetFile(parquetDataPath))
-
-        // Send to graphite
-        val graphite = new GraphiteClient(params.graphiteHost, 
params.graphitePort)
-        graphite.sendOnce(metric, requestCount, time.getMillis / 1000)
-      }
-      case None => sys.exit(1)
-    }
-  }
-
-}
diff --git 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/connectors/GraphiteClient.scala
 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/connectors/GraphiteClient.scala
index b92ece1..c47ee98 100644
--- 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/connectors/GraphiteClient.scala
+++ 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/connectors/GraphiteClient.scala
@@ -6,6 +6,17 @@
 import org.joda.time.DateTimeUtils
 
 /**
+  * Helper to create a message string following Carbon's plaintext protocol
+  *
+  * @param metric Name of the graphite metric, e.g foo.bar
+  * @param value Value of the metric
+  * @param timestamp Timestamp in seconds, defaults to current time
+  */
+case class GraphiteMessage(metric: String, value: Long, timestamp:Long = 
DateTimeUtils.currentTimeMillis() / 1000) {
+  override def toString = "%s %d %d\n".format(metric, value, timestamp)
+}
+
+/**
  * Simple GraphiteClient in Scala
  * Creates a Socket and writes to it,
  * based on the plaintext protocol supported by Carbon
@@ -44,10 +55,12 @@
     }
 
     def close() = {
-      out.close
-      socket.close
+      out.close()
+      socket.close()
     }
   }
+
+
 
   /**
    * Create an instance of Connection
@@ -60,18 +73,6 @@
   }
 
   /**
-   * Helper to create a message string following Carbon's plaintext protocol
-   *
-   * @param metric Name of the graphite metric, e.g foo.bar
-   * @param value Value of the metric
-   * @param timestamp Timestamp in seconds, defaults to current time
-   * @return Formatted string for plaintext protocol
-   */
-  def message(metric:String, value:Long, timestamp:Long = 
DateTimeUtils.currentTimeMillis() / 1000) = {
-    "%s %d %d\n".format(metric, value, timestamp)
-  }
-
-  /**
    * Helper that opens a connection, sends a message, and closes connection
    * @param metric Name of the graphite metric, e.g foo.bar
    * @param value Value of the metric
@@ -79,7 +80,17 @@
    */
   def sendOnce(metric:String, value:Long, timestamp:Long) = {
     val conn = connection()
-    conn.write(message(metric, value, timestamp))
+    conn.write(new GraphiteMessage(metric, value, timestamp).toString)
+    conn.close()
+  }
+
+  /**
+    * Helper that opens a connection, sends a list of messages, and closes 
connection
+    * @param messages List of messages to be sent
+    */
+  def sendMany(messages: Seq[GraphiteMessage]) = {
+    val conn = connection()
+    messages.foreach(message => conn.write(message.toString))
     conn.close()
   }
 

-- 
To view, visit https://gerrit.wikimedia.org/r/392700
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I3040ab901d7fe28a3a5e3b924cb8c142f31e3a3e
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery/source
Gerrit-Branch: master
Gerrit-Owner: Joal <j...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to