Madhuvishy has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/234453

Change subject: [WIP] Report RESTBase traffic metrics to Graphite
......................................................................

[WIP] Report RESTBase traffic metrics to Graphite

This Spark Job aims to Run hourly and report restbase request counts
to Graphite. It will be scheduled via Oozie.

Bug: T109547
Change-Id: I1dd47de9aaa8f80df9a1db1db8c375d07f5ca950
---
A 
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/RESTBaseMetrics.scala
A 
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/util/GraphiteClient.scala
2 files changed, 132 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source 
refs/changes/53/234453/1

diff --git 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/RESTBaseMetrics.scala
 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/RESTBaseMetrics.scala
new file mode 100644
index 0000000..04e69da
--- /dev/null
+++ 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/RESTBaseMetrics.scala
@@ -0,0 +1,102 @@
+package org.wikimedia.analytics.refinery.job
+
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.{SparkContext, SparkConf}
+import org.joda.time.DateTime
+import org.wikimedia.analytics.refinery.util.GraphiteClient
+import scopt.OptionParser
+
+/**
+ * Reports metrics for Restbase to graphite
+ *
+ * Usage with spark-submit:
+ * spark-submit \
+ * --class org.wikimedia.analytics.refinery.job.RESTBaseMetrics
+ * /path/to/refinery-job.jar
+ * -y <year> -m <month> -d <day> -h <hour> [-p <prefix> -w 
<webrequest-base-path> -g <graphite-host>]
+ */
+object RESTBaseMetrics {
+
+
+  /**
+   * Config class for CLI argument parser using scopt
+   */
+  case class Params(webrequestBasePath: String = 
"hdfs://analytics-hadoop/wmf/data/wmf/webrequest",
+                    graphiteHost: String = "graphite-in.eqiad.wmnet",
+                    //FIXME: This is test while WIP to prevent accidentally 
pushing stats to the restbase namespace in
+                    // prod, must change before deploying
+                    prefix: String = "test",
+                    year: Int = 0, month: Int = 0, day: Int = 0, hour: Int = 0)
+
+  /**
+   * Define the command line options parser
+   */
+  val argsParser = new OptionParser[Params]("RESTBase Metrics") {
+    head("RESTBase Metrics", "")
+    note("This job reports RESTBase traffic to graphite hourly")
+    help("help") text ("Prints this usage text")
+
+    opt[String]('w', "webrequest-base-path") optional() valueName ("<path>") 
action { (x, p) =>
+      p.copy(webrequestBasePath = if (x.endsWith("/")) x.dropRight(1) else x)
+    } text ("Base path to webrequest data on hadoop. Defaults to 
hdfs://analytics-hadoop/wmf/data/wmf/webrequest")
+
+    opt[String]('g', "graphite-host") optional() valueName ("<path>") action { 
(x, p) =>
+      p.copy(graphiteHost = x)
+    } text ("Graphite host. Defaults to prod graphite - 
graphite-in.eqiad.wmnet")
+
+    opt[String]('p', "prefix") optional() valueName ("<path>") action { (x, p) 
=>
+      p.copy(prefix = x)
+    } text ("Prefix/namespace for graphite metric. Defaults to restbase")
+
+    opt[Int]('y', "year") required() action { (x, p) =>
+      p.copy(year = x)
+    } text ("Year as an integer")
+
+    opt[Int]('m', "month") required() action { (x, p) =>
+      p.copy(month = x)
+    } validate { x => if (x > 0 & x <= 12) success else failure("Invalid 
month")
+    } text ("Month as an integer")
+
+    opt[Int]('d', "day") required() action { (x, p) =>
+      p.copy(day = x)
+    } validate { x => if (x > 0 & x <= 31) success else failure("Invalid day")
+    } text ("Day of month as an integer")
+
+    opt[Int]('h', "hour") required() action { (x, p) =>
+      p.copy(hour = x)
+    } validate { x => if (x >= 0 & x < 24) success else failure("Invalid hour")
+    } text ("Hour of day as an integer (0-23)")
+
+  }
+
+  def count(parquetData: DataFrame): Long = {
+    parquetData.filter("uri_path like '%/api/rest_v1%'").count
+  }
+
+  def main(args: Array[String]): Unit = {
+    argsParser.parse(args, Params()) match {
+      case Some(params) => {
+        // Initial Spark setup
+        val conf = new SparkConf().setAppName("RESTBaseMetrics")
+        val sc = new SparkContext(conf)
+        val sqlContext = new HiveContext(sc)
+
+        // Define the path to load data in Parquet format
+        val parquetDataPath = 
"%s/webrequest_source=text/year=%d/month=%d/day=%d/hour=%d"
+          .format(params.webrequestBasePath, params.year, params.month, 
params.day, params.hour)
+
+        // Define time, metric, Compute request count
+        val time = new DateTime(params.year, params.month, params.day, 
params.hour, 0)
+        val metric = "%s.restbase.requests".format(params.prefix)
+        val requestCount = count(sqlContext.parquetFile(parquetDataPath))
+
+        // Send to graphite
+        val graphite = new GraphiteClient(params.graphiteHost)
+        graphite.sendStats(metric, requestCount, time.getMillis / 1000)
+      }
+      case None => sys.exit(1)
+    }
+  }
+
+}
diff --git 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/util/GraphiteClient.scala
 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/util/GraphiteClient.scala
new file mode 100644
index 0000000..a3f689b
--- /dev/null
+++ 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/util/GraphiteClient.scala
@@ -0,0 +1,30 @@
+package org.wikimedia.analytics.refinery.util
+
+import java.io.OutputStream
+import java.net.Socket
+
+/**
+ * Simple GraphiteClient in Scala
+ * Creates a Socket and writes to it,
+ * based on the plaintext protocol supported by Carbon
+ *
+ * See: 
http://graphite.readthedocs.org/en/latest/feeding-carbon.html#the-plaintext-protocol
+ * for details
+ */
+class GraphiteClient(host:String, port: Int = 2003) {
+
+  def sendData(data:String) = {
+    val byteData:Array[Byte] = data.getBytes()
+    val socket:Socket = new Socket(host, port)
+    val out:OutputStream = socket.getOutputStream
+    out.write(byteData)
+    out.flush()
+    out.close()
+    socket.close()
+  }
+
+  def sendStats(metric:String, value:Long = 1, timestamp:Long) = {
+    sendData("%s %d %d\n".format(metric, value, timestamp))
+  }
+
+}

-- 
To view, visit https://gerrit.wikimedia.org/r/234453
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I1dd47de9aaa8f80df9a1db1db8c375d07f5ca950
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery/source
Gerrit-Branch: master
Gerrit-Owner: Madhuvishy <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to