Joal has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/351370 )

Change subject: [WIP] Add streaming for druid pageviews
......................................................................

[WIP] Add streaming for druid pageviews

Change-Id: I2e939a463fc523e0d734a84951decd1973aa35d2
---
M refinery-core/src/main/java/org/wikimedia/analytics/refinery/core/Geocode.java
A 
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/PageviewsStream.scala
2 files changed, 249 insertions(+), 4 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source 
refs/changes/70/351370/1

diff --git 
a/refinery-core/src/main/java/org/wikimedia/analytics/refinery/core/Geocode.java
 
b/refinery-core/src/main/java/org/wikimedia/analytics/refinery/core/Geocode.java
index af1339a..c07975b 100644
--- 
a/refinery-core/src/main/java/org/wikimedia/analytics/refinery/core/Geocode.java
+++ 
b/refinery-core/src/main/java/org/wikimedia/analytics/refinery/core/Geocode.java
@@ -62,14 +62,31 @@
     private DatabaseReader cityDatabaseReader;
 
 
+    /*
+     * Meta-methods to enable eager instantiation in a singleton-based way.
+     * in non-Java terms: you get to only create one class instance, and only
+     * when you need it, instead of always having everything (static/eager 
instantiation)
+     * or always generating everything anew (!singletons). So we have:
+     * (1) an instance;
+     * (2) an empty constructor (to avoid people just calling the constructor);
+     * (3) an actual getInstance method to allow for instantiation.
+     */
+    private static final Geocode instance = new Geocode();
+
+    public static Geocode getInstance() {
+        return instance;
+    }
+
+
     /**
      * Constructs a Geocode object with the default Maxmind 2 database paths.
      * You can override either of the default database paths by setting
      * the 'maxmind.database.country' and/or 'maxmind.database.city' 
properties.
      */
-    public Geocode() throws IOException {
+    public Geocode() {
         this(null, null);
     }
+
 
     /**
      * Constructs a Geocode object with the provided Maxmind 2 database paths.
@@ -83,7 +100,7 @@
      * @param cityDatabasePath
      *      String path to Maxmind's city database
      */
-    public Geocode(String countryDatabasePath, String cityDatabasePath) throws 
IOException {
+    public Geocode(String countryDatabasePath, String cityDatabasePath) {
         // Override database paths with System properties, if they exist
         if (countryDatabasePath == null) {
             countryDatabasePath = 
System.getProperty("maxmind.database.country", DEFAULT_DATABASE_COUNTRY_PATH);
@@ -95,8 +112,12 @@
         LOG.info("Geocode using Maxmind country database: " + 
countryDatabasePath);
         LOG.info("Geocode using Maxmind city database: "    + 
cityDatabasePath);
 
-        countryDatabaseReader = new DatabaseReader.Builder(new 
File(countryDatabasePath)).build();
-        cityDatabaseReader    = new DatabaseReader.Builder(new 
File(cityDatabasePath)).build();
+        try {
+            countryDatabaseReader = new DatabaseReader.Builder(new 
File(countryDatabasePath)).build();
+            cityDatabaseReader    = new DatabaseReader.Builder(new 
File(cityDatabasePath)).build();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     /**
diff --git 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/PageviewsStream.scala
 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/PageviewsStream.scala
new file mode 100644
index 0000000..41673d7
--- /dev/null
+++ 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/PageviewsStream.scala
@@ -0,0 +1,224 @@
+package org.wikimedia.analytics.refinery.job
+
+import kafka.serializer.StringDecoder
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
+import org.apache.spark.streaming.kafka.KafkaUtils
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.{SparkConf, SparkContext}
+import org.json4s.JsonDSL._
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+import org.wikimedia.analytics.refinery.core.{SearchEngineClassifier, 
UAParser, PageviewDefinition, Webrequest, Geocode}
+import scopt.OptionParser
+
+import scala.collection.JavaConverters._
+
+
+object PageviewsStream {
+
+  def getStringValue(json: JValue, fieldName: String): String = (json \ 
fieldName).values.asInstanceOf[String]
+
+  def run(
+           @transient sc: SparkContext,
+           kafkaBrokers:String,
+           kafkaInputTopics: String,
+           kafkaOutputTopic: String,
+           batchDurationSeconds: Int,
+           checkpointDirectory: String,
+           noCheckpoint: Boolean
+         ): Unit = {
+
+    def newStreamingContext() = {
+      val ssc = new StreamingContext(sc, Seconds(batchDurationSeconds.toLong))
+      ssc.checkpoint(checkpointDirectory)
+
+      val kafkaInputTopicsSet = kafkaInputTopics.split(",").toSet
+      val KafkaInputParameters = Map[String, String]("metadata.broker.list" -> 
kafkaBrokers)
+
+      // Get kafka batches from input topics
+      val messageStream = KafkaUtils.createDirectStream[String, String, 
StringDecoder, StringDecoder](
+        ssc,
+        KafkaInputParameters,
+        kafkaInputTopicsSet
+      )
+
+      // Compute pageview oriented filtering / conversion / aggregation
+      val pageviewStream = messageStream.
+        // Extract the JSON message from the Kafka (Key, Value) message.
+        map { case (_, str) => parse(str) }.
+        filter(json => PageviewDefinition.getInstance().isPageview(
+          getStringValue(json, "uri_host"),
+          getStringValue(json, "uri_path"),
+          getStringValue(json, "uri_query"),
+          getStringValue(json, "http_status"),
+          getStringValue(json, "content_type"),
+          getStringValue(json, "user_agent"),
+          getStringValue(json, "x_analytics")
+        )).
+        map(json => {
+
+          val pv = PageviewDefinition.getInstance()
+          val wr = Webrequest.getInstance()
+
+          val uri_host = getStringValue(json, "uri_host")
+          val uri_path = getStringValue(json, "uri_path")
+          val uri_query = getStringValue(json, "uri_query")
+          val userAgent = getStringValue(json, "user_agent")
+          val referer = getStringValue(json, "referer")
+          val xAnalytics = getStringValue(json, "x_analytics")
+          val clientIp = getStringValue(json, "ip")
+
+          val parsedUserAgent: Map[String, String] = 
UAParser.getInstance().getUAMap(userAgent).asScala.toMap
+          val geocodeData: Map[String, String] = 
Geocode.getInstance().getGeocodedData(clientIp).asScala.toMap.map { case (k, v) 
=> k -> v.toString }
+
+          // begin: dimensions
+          val project = pv.getProjectFromHost(uri_host)
+          val languageVariant = pv.getLanguageVariantFromPath(uri_path)
+          val accessMethod = wr.getAccessMethod(uri_host, userAgent)
+          val agentType = if (parsedUserAgent.getOrElse("device_family", 
"").equals("Spider") || wr.isSpider(userAgent)) "spider" else "user"
+          val refererClass = 
SearchEngineClassifier.getInstance.refererClassify(referer)
+          val zero = wr.getXAnalyticsValue(xAnalytics, "zero")
+
+          val continent = geocodeData.get("continent")
+          val countryCode = geocodeData.get("country_code")
+          val country = geocodeData.get("country")
+          val subdivision = geocodeData.get("subdivision")
+          val city = geocodeData.get("city")
+
+          val uaDeviceFamily = parsedUserAgent.get("device_family")
+          val uaBrowserFamily = parsedUserAgent.get("browser_family")
+          val uaBrowserMajor = parsedUserAgent.get("browser_major")
+          val uaOsFamily = parsedUserAgent.get("os_family")
+          val uaOsMajor = parsedUserAgent.get("os_major")
+          val uaOsMinor = parsedUserAgent.get("os_minor")
+          val uaWmfAppVersion = parsedUserAgent.get("wmf_app_version")
+          // end: dimensions
+
+          val hourTs = (json \ 
"dt").values.asInstanceOf[String].replaceAll(":\\d\\d:\\d\\d$", ":00:00")
+
+          ("dt" -> hourTs) ~
+            ("project" -> project) ~
+            ("language_variant" -> languageVariant) ~
+            ("access_method" -> accessMethod) ~
+            ("agent_type" -> agentType) ~
+            ("referer_class" -> refererClass) ~
+            ("zero" -> zero) ~
+            ("continent" -> continent) ~
+            ("country_code" -> countryCode) ~
+            ("country" -> country) ~
+            ("subdivision" -> subdivision) ~
+            ("city" -> city) ~
+            ("ua_device_family" -> uaDeviceFamily) ~
+            ("ua_browser_family" -> uaBrowserFamily) ~
+            ("ua_browser_major" -> uaBrowserMajor) ~
+            ("ua_os_family" -> uaOsFamily) ~
+            ("ua_os_major" -> uaOsMajor) ~
+            ("ua_wmf_app_version" -> uaWmfAppVersion)
+        }).
+        countByValue().
+        // TODO: is this right?
+        map { case (json, count) => json ++ ("view_count" -> count) }.
+        map(j => compact(render(j)))
+
+      // Output pageview data back to kafka
+      pageviewStream.foreachRDD(rdd => {
+        System.out.println("# events = " + rdd.count())
+
+        rdd.foreachPartition(partition => {
+          // Print statements in this section are shown in the executor's 
stdout logs
+          val props = Map[String, String](
+            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> kafkaBrokers,
+            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> 
"org.apache.kafka.common.serialization.StringSerializer",
+            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> 
"org.apache.kafka.common.serialization.StringSerializer"
+          ).asInstanceOf[Map[String, Object]]
+
+          val producer = new KafkaProducer[String, String](props.asJava)
+          partition.foreach(record => {
+            val data = record.toString
+            val message = new ProducerRecord[String, String](kafkaOutputTopic, 
null, data)
+            producer.send(message)
+          })
+          producer.close()
+        })
+      })
+      ssc
+    }
+
+    val context = {
+      if (noCheckpoint) newStreamingContext()
+      else StreamingContext.getOrCreate(checkpointDirectory, 
newStreamingContext)
+    }
+
+    // Start the context
+    context.start()
+    context.awaitTermination()
+  }
+
+  /**
+    * Config class for CLI argument parser using scopt
+    */
+  case class Params(
+                     kafkaBrokers: String = Seq("12", "13", "14", "18", "20", 
"22").map("kafka10" + _ + ".eqiad.wmnet:9092").mkString(","),
+                     kafkaInputTopics: String = "webrequest_text",
+                     kafkaOutputTopic: String = "test_pageviews_joal",
+                     batchDurationSecs: Int = 10,
+                     checkpointDirectory: String = 
"hdfs://analytics-hadoop/tmp/spark/pageviews_stream_checkpoint",
+                     noCheckpoint: Boolean = false
+                   )
+
+  /**
+    * Define the command line options parser
+    */
+  val argsParser = new OptionParser[Params]("Pageviews Stream") {
+    head("Pageviews Stream", "")
+    note( "Extract pageview data from kafka webrequest stream and write it 
back to kafka aggregated")
+    help("help") text "Prints this usage text"
+
+    opt[String]('k', "kafka-brokers") optional() valueName 
"<broker1,...,brokerN>" action {
+      (x, p) => p.copy(kafkaBrokers = x)
+    } text "Kafka brokers to consume from. Defaults to 
kafka10[12|14|18|20|22].eqiad.wmnet:9092"
+
+    opt[String]('i', "kafka-input-topics") optional() valueName 
"<topic1,...,topicK>" action {
+      (x, p) => p.copy(kafkaInputTopics = x)
+    } text "Input topics to consume. Defaults to webrequest_text"
+
+    opt[String]('o', "kafka-output-topic") optional() valueName "<topic>" 
action {
+      (x, p) => p.copy(kafkaOutputTopic = x)
+    } text "Output topic to write to. Defaults to test_pageviews_joal"
+
+    opt[Int]("batch-duration-seconds") optional() action {
+      (x, p) => p.copy(batchDurationSecs = x)
+    } text "Batch duration in seconds. Defaults to 10."
+
+    opt[String]("checkpoint-dir") optional() valueName "<path>" action {
+      (x, p) => p.copy(checkpointDirectory = if (x.endsWith("/")) x else x + 
"/")
+    } text ("Temporary directory for check-pointing streaming job.\n\t" +
+      "Defaults to 
hdfs://analytics-hadoop/tmp/spark/pageviews_stream_checkpoint")
+
+    opt[Unit]("no-checkpoint") optional() action {
+      (_, p) => p.copy(noCheckpoint = true)
+    } text "Force NOT using checkpoint if exists (wipes existing checkpoint 
directory if any)."
+
+  }
+
+  def main(args: Array[String]) {
+    argsParser.parse(args, Params()) match {
+
+      case Some(params) =>
+        // Initial setup - Spark, SQLContext
+        val conf = new SparkConf().setAppName("PageviewsStream")
+        val sc = new SparkContext(conf)
+        run(
+          sc,
+          params.kafkaBrokers,
+          params.kafkaInputTopics,
+          params.kafkaOutputTopic,
+          params.batchDurationSecs,
+          params.checkpointDirectory,
+          params.noCheckpoint
+        )
+
+      case None => sys.exit(1)
+    }
+  }
+}
\ No newline at end of file

-- 
To view, visit https://gerrit.wikimedia.org/r/351370
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I2e939a463fc523e0d734a84951decd1973aa35d2
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery/source
Gerrit-Branch: master
Gerrit-Owner: Joal <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to