Madhuvishy has uploaded a new change for review.
https://gerrit.wikimedia.org/r/234453
Change subject: [WIP] Report RESTBase traffic metrics to Graphite
......................................................................
[WIP] Report RESTBase traffic metrics to Graphite
This Spark Job aims to Run hourly and report restbase request counts
to Graphite. It will be scheduled via Oozie.
Bug: T109547
Change-Id: I1dd47de9aaa8f80df9a1db1db8c375d07f5ca950
---
A
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/RESTBaseMetrics.scala
A
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/util/GraphiteClient.scala
2 files changed, 132 insertions(+), 0 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source
refs/changes/53/234453/1
diff --git
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/RESTBaseMetrics.scala
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/RESTBaseMetrics.scala
new file mode 100644
index 0000000..04e69da
--- /dev/null
+++
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/RESTBaseMetrics.scala
@@ -0,0 +1,102 @@
+package org.wikimedia.analytics.refinery.job
+
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.{SparkContext, SparkConf}
+import org.joda.time.DateTime
+import org.wikimedia.analytics.refinery.util.GraphiteClient
+import scopt.OptionParser
+
+/**
+ * Reports metrics for Restbase to graphite
+ *
+ * Usage with spark-submit:
+ * spark-submit \
+ * --class org.wikimedia.analytics.refinery.job.RESTBaseMetrics
+ * /path/to/refinery-job.jar
+ * -y <year> -m <month> -d <day> -h <hour> [-p <prefix> -w
<webrequest-base-path> -g <graphite-host>]
+ */
+object RESTBaseMetrics {
+
+
+ /**
+ * Config class for CLI argument parser using scopt
+ */
+ case class Params(webrequestBasePath: String =
"hdfs://analytics-hadoop/wmf/data/wmf/webrequest",
+ graphiteHost: String = "graphite-in.eqiad.wmnet",
+ //FIXME: This is test while WIP to prevent accidentally
pushing stats to the restbase namespace in
+ // prod, must change before deploying
+ prefix: String = "test",
+ year: Int = 0, month: Int = 0, day: Int = 0, hour: Int = 0)
+
+ /**
+ * Define the command line options parser
+ */
+ val argsParser = new OptionParser[Params]("RESTBase Metrics") {
+ head("RESTBase Metrics", "")
+ note("This job reports RESTBase traffic to graphite hourly")
+ help("help") text ("Prints this usage text")
+
+ opt[String]('w', "webrequest-base-path") optional() valueName ("<path>")
action { (x, p) =>
+ p.copy(webrequestBasePath = if (x.endsWith("/")) x.dropRight(1) else x)
+ } text ("Base path to webrequest data on hadoop. Defaults to
hdfs://analytics-hadoop/wmf/data/wmf/webrequest")
+
+ opt[String]('g', "graphite-host") optional() valueName ("<path>") action {
(x, p) =>
+ p.copy(graphiteHost = x)
+ } text ("Graphite host. Defaults to prod graphite -
graphite-in.eqiad.wmnet")
+
+ opt[String]('p', "prefix") optional() valueName ("<path>") action { (x, p)
=>
+ p.copy(prefix = x)
+ } text ("Prefix/namespace for graphite metric. Defaults to restbase")
+
+ opt[Int]('y', "year") required() action { (x, p) =>
+ p.copy(year = x)
+ } text ("Year as an integer")
+
+ opt[Int]('m', "month") required() action { (x, p) =>
+ p.copy(month = x)
+ } validate { x => if (x > 0 & x <= 12) success else failure("Invalid
month")
+ } text ("Month as an integer")
+
+ opt[Int]('d', "day") required() action { (x, p) =>
+ p.copy(day = x)
+ } validate { x => if (x > 0 & x <= 31) success else failure("Invalid day")
+ } text ("Day of month as an integer")
+
+ opt[Int]('h', "hour") required() action { (x, p) =>
+ p.copy(hour = x)
+ } validate { x => if (x >= 0 & x < 24) success else failure("Invalid hour")
+ } text ("Hour of day as an integer (0-23)")
+
+ }
+
+ def count(parquetData: DataFrame): Long = {
+ parquetData.filter("uri_path like '%/api/rest_v1%'").count
+ }
+
+ def main(args: Array[String]): Unit = {
+ argsParser.parse(args, Params()) match {
+ case Some(params) => {
+ // Initial Spark setup
+ val conf = new SparkConf().setAppName("RESTBaseMetrics")
+ val sc = new SparkContext(conf)
+ val sqlContext = new HiveContext(sc)
+
+ // Define the path to load data in Parquet format
+ val parquetDataPath =
"%s/webrequest_source=text/year=%d/month=%d/day=%d/hour=%d"
+ .format(params.webrequestBasePath, params.year, params.month,
params.day, params.hour)
+
+ // Define time, metric, Compute request count
+ val time = new DateTime(params.year, params.month, params.day,
params.hour, 0)
+ val metric = "%s.restbase.requests".format(params.prefix)
+ val requestCount = count(sqlContext.parquetFile(parquetDataPath))
+
+ // Send to graphite
+ val graphite = new GraphiteClient(params.graphiteHost)
+ graphite.sendStats(metric, requestCount, time.getMillis / 1000)
+ }
+ case None => sys.exit(1)
+ }
+ }
+
+}
diff --git
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/util/GraphiteClient.scala
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/util/GraphiteClient.scala
new file mode 100644
index 0000000..a3f689b
--- /dev/null
+++
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/util/GraphiteClient.scala
@@ -0,0 +1,30 @@
+package org.wikimedia.analytics.refinery.util
+
+import java.io.OutputStream
+import java.net.Socket
+
+/**
+ * Simple GraphiteClient in Scala
+ * Creates a Socket and writes to it,
+ * based on the plaintext protocol supported by Carbon
+ *
+ * See:
http://graphite.readthedocs.org/en/latest/feeding-carbon.html#the-plaintext-protocol
+ * for details
+ */
+class GraphiteClient(host:String, port: Int = 2003) {
+
+ def sendData(data:String) = {
+ val byteData:Array[Byte] = data.getBytes()
+ val socket:Socket = new Socket(host, port)
+ val out:OutputStream = socket.getOutputStream
+ out.write(byteData)
+ out.flush()
+ out.close()
+ socket.close()
+ }
+
+ def sendStats(metric:String, value:Long = 1, timestamp:Long) = {
+ sendData("%s %d %d\n".format(metric, value, timestamp))
+ }
+
+}
--
To view, visit https://gerrit.wikimedia.org/r/234453
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I1dd47de9aaa8f80df9a1db1db8c375d07f5ca950
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery/source
Gerrit-Branch: master
Gerrit-Owner: Madhuvishy <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits