Joal has uploaded a new change for review. (
https://gerrit.wikimedia.org/r/351370 )
Change subject: [WIP] Add streaming for druid pageviews
......................................................................
[WIP] Add streaming for druid pageviews
Change-Id: I2e939a463fc523e0d734a84951decd1973aa35d2
---
M refinery-core/src/main/java/org/wikimedia/analytics/refinery/core/Geocode.java
A
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/PageviewsStream.scala
2 files changed, 249 insertions(+), 4 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source
refs/changes/70/351370/1
diff --git
a/refinery-core/src/main/java/org/wikimedia/analytics/refinery/core/Geocode.java
b/refinery-core/src/main/java/org/wikimedia/analytics/refinery/core/Geocode.java
index af1339a..c07975b 100644
---
a/refinery-core/src/main/java/org/wikimedia/analytics/refinery/core/Geocode.java
+++
b/refinery-core/src/main/java/org/wikimedia/analytics/refinery/core/Geocode.java
@@ -62,14 +62,31 @@
private DatabaseReader cityDatabaseReader;
+ /*
+ * Meta-methods to enable eager instantiation in a singleton-based way.
+ * in non-Java terms: you get to only create one class instance, and only
+ * when you need it, instead of always having everything (static/eager
instantiation)
+ * or always generating everything anew (!singletons). So we have:
+ * (1) an instance;
+ * (2) an empty constructor (to avoid people just calling the constructor);
+ * (3) an actual getInstance method to allow for instantiation.
+ */
+ private static final Geocode instance = new Geocode();
+
+ public static Geocode getInstance() {
+ return instance;
+ }
+
+
/**
* Constructs a Geocode object with the default Maxmind 2 database paths.
* You can override either of the default database paths by setting
* the 'maxmind.database.country' and/or 'maxmind.database.city'
properties.
*/
- public Geocode() throws IOException {
+ public Geocode() {
this(null, null);
}
+
/**
* Constructs a Geocode object with the provided Maxmind 2 database paths.
@@ -83,7 +100,7 @@
* @param cityDatabasePath
* String path to Maxmind's city database
*/
- public Geocode(String countryDatabasePath, String cityDatabasePath) throws
IOException {
+ public Geocode(String countryDatabasePath, String cityDatabasePath) {
// Override database paths with System properties, if they exist
if (countryDatabasePath == null) {
countryDatabasePath =
System.getProperty("maxmind.database.country", DEFAULT_DATABASE_COUNTRY_PATH);
@@ -95,8 +112,12 @@
LOG.info("Geocode using Maxmind country database: " +
countryDatabasePath);
LOG.info("Geocode using Maxmind city database: " +
cityDatabasePath);
- countryDatabaseReader = new DatabaseReader.Builder(new
File(countryDatabasePath)).build();
- cityDatabaseReader = new DatabaseReader.Builder(new
File(cityDatabasePath)).build();
+ try {
+ countryDatabaseReader = new DatabaseReader.Builder(new
File(countryDatabasePath)).build();
+ cityDatabaseReader = new DatabaseReader.Builder(new
File(cityDatabasePath)).build();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
/**
diff --git
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/PageviewsStream.scala
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/PageviewsStream.scala
new file mode 100644
index 0000000..41673d7
--- /dev/null
+++
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/PageviewsStream.scala
@@ -0,0 +1,224 @@
+package org.wikimedia.analytics.refinery.job
+
+import kafka.serializer.StringDecoder
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,
ProducerRecord}
+import org.apache.spark.streaming.kafka.KafkaUtils
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.{SparkConf, SparkContext}
+import org.json4s.JsonDSL._
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+import org.wikimedia.analytics.refinery.core.{SearchEngineClassifier,
UAParser, PageviewDefinition, Webrequest, Geocode}
+import scopt.OptionParser
+
+import scala.collection.JavaConverters._
+
+
+object PageviewsStream {
+
+ def getStringValue(json: JValue, fieldName: String): String = (json \
fieldName).values.asInstanceOf[String]
+
+ def run(
+ @transient sc: SparkContext,
+ kafkaBrokers:String,
+ kafkaInputTopics: String,
+ kafkaOutputTopic: String,
+ batchDurationSeconds: Int,
+ checkpointDirectory: String,
+ noCheckpoint: Boolean
+ ): Unit = {
+
+ def newStreamingContext() = {
+ val ssc = new StreamingContext(sc, Seconds(batchDurationSeconds.toLong))
+ ssc.checkpoint(checkpointDirectory)
+
+ val kafkaInputTopicsSet = kafkaInputTopics.split(",").toSet
+ val KafkaInputParameters = Map[String, String]("metadata.broker.list" ->
kafkaBrokers)
+
+ // Get kafka batches from input topics
+ val messageStream = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
+ ssc,
+ KafkaInputParameters,
+ kafkaInputTopicsSet
+ )
+
+ // Compute pageview oriented filtering / conversion / aggregation
+ val pageviewStream = messageStream.
+ // Extract the JSON message from the Kafka (Key, Value) message.
+ map { case (_, str) => parse(str) }.
+ filter(json => PageviewDefinition.getInstance().isPageview(
+ getStringValue(json, "uri_host"),
+ getStringValue(json, "uri_path"),
+ getStringValue(json, "uri_query"),
+ getStringValue(json, "http_status"),
+ getStringValue(json, "content_type"),
+ getStringValue(json, "user_agent"),
+ getStringValue(json, "x_analytics")
+ )).
+ map(json => {
+
+ val pv = PageviewDefinition.getInstance()
+ val wr = Webrequest.getInstance()
+
+ val uri_host = getStringValue(json, "uri_host")
+ val uri_path = getStringValue(json, "uri_path")
+ val uri_query = getStringValue(json, "uri_query")
+ val userAgent = getStringValue(json, "user_agent")
+ val referer = getStringValue(json, "referer")
+ val xAnalytics = getStringValue(json, "x_analytics")
+ val clientIp = getStringValue(json, "ip")
+
+ val parsedUserAgent: Map[String, String] =
UAParser.getInstance().getUAMap(userAgent).asScala.toMap
+ val geocodeData: Map[String, String] =
Geocode.getInstance().getGeocodedData(clientIp).asScala.toMap.map { case (k, v)
=> k -> v.toString }
+
+ // begin: dimensions
+ val project = pv.getProjectFromHost(uri_host)
+ val languageVariant = pv.getLanguageVariantFromPath(uri_path)
+ val accessMethod = wr.getAccessMethod(uri_host, userAgent)
+ val agentType = if (parsedUserAgent.getOrElse("device_family",
"").equals("Spider") || wr.isSpider(userAgent)) "spider" else "user"
+ val refererClass =
SearchEngineClassifier.getInstance.refererClassify(referer)
+ val zero = wr.getXAnalyticsValue(xAnalytics, "zero")
+
+ val continent = geocodeData.get("continent")
+ val countryCode = geocodeData.get("country_code")
+ val country = geocodeData.get("country")
+ val subdivision = geocodeData.get("subdivision")
+ val city = geocodeData.get("city")
+
+ val uaDeviceFamily = parsedUserAgent.get("device_family")
+ val uaBrowserFamily = parsedUserAgent.get("browser_family")
+ val uaBrowserMajor = parsedUserAgent.get("browser_major")
+ val uaOsFamily = parsedUserAgent.get("os_family")
+ val uaOsMajor = parsedUserAgent.get("os_major")
+ val uaOsMinor = parsedUserAgent.get("os_minor")
+ val uaWmfAppVersion = parsedUserAgent.get("wmf_app_version")
+ // end: dimensions
+
+ val hourTs = (json \
"dt").values.asInstanceOf[String].replaceAll(":\\d\\d:\\d\\d$", ":00:00")
+
+ ("dt" -> hourTs) ~
+ ("project" -> project) ~
+ ("language_variant" -> languageVariant) ~
+ ("access_method" -> accessMethod) ~
+ ("agent_type" -> agentType) ~
+ ("referer_class" -> refererClass) ~
+ ("zero" -> zero) ~
+ ("continent" -> continent) ~
+ ("country_code" -> countryCode) ~
+ ("country" -> country) ~
+ ("subdivision" -> subdivision) ~
+ ("city" -> city) ~
+ ("ua_device_family" -> uaDeviceFamily) ~
+ ("ua_browser_family" -> uaBrowserFamily) ~
+ ("ua_browser_major" -> uaBrowserMajor) ~
+ ("ua_os_family" -> uaOsFamily) ~
+ ("ua_os_major" -> uaOsMajor) ~
+ ("ua_wmf_app_version" -> uaWmfAppVersion)
+ }).
+ countByValue().
+ // TODO: is this right?
+ map { case (json, count) => json ++ ("view_count" -> count) }.
+ map(j => compact(render(j)))
+
+ // Output pageview data back to kafka
+ pageviewStream.foreachRDD(rdd => {
+ System.out.println("# events = " + rdd.count())
+
+ rdd.foreachPartition(partition => {
+ // Print statements in this section are shown in the executor's
stdout logs
+ val props = Map[String, String](
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> kafkaBrokers,
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG ->
"org.apache.kafka.common.serialization.StringSerializer",
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG ->
"org.apache.kafka.common.serialization.StringSerializer"
+ ).asInstanceOf[Map[String, Object]]
+
+ val producer = new KafkaProducer[String, String](props.asJava)
+ partition.foreach(record => {
+ val data = record.toString
+ val message = new ProducerRecord[String, String](kafkaOutputTopic,
null, data)
+ producer.send(message)
+ })
+ producer.close()
+ })
+ })
+ ssc
+ }
+
+ val context = {
+ if (noCheckpoint) newStreamingContext()
+ else StreamingContext.getOrCreate(checkpointDirectory,
newStreamingContext)
+ }
+
+ // Start the context
+ context.start()
+ context.awaitTermination()
+ }
+
+ /**
+ * Config class for CLI argument parser using scopt
+ */
+ case class Params(
+ kafkaBrokers: String = Seq("12", "13", "14", "18", "20",
"22").map("kafka10" + _ + ".eqiad.wmnet:9092").mkString(","),
+ kafkaInputTopics: String = "webrequest_text",
+ kafkaOutputTopic: String = "test_pageviews_joal",
+ batchDurationSecs: Int = 10,
+ checkpointDirectory: String =
"hdfs://analytics-hadoop/tmp/spark/pageviews_stream_checkpoint",
+ noCheckpoint: Boolean = false
+ )
+
+ /**
+ * Define the command line options parser
+ */
+ val argsParser = new OptionParser[Params]("Pageviews Stream") {
+ head("Pageviews Stream", "")
+ note( "Extract pageview data from kafka webrequest stream and write it
back to kafka aggregated")
+ help("help") text "Prints this usage text"
+
+ opt[String]('k', "kafka-brokers") optional() valueName
"<broker1,...,brokerN>" action {
+ (x, p) => p.copy(kafkaBrokers = x)
+ } text "Kafka brokers to consume from. Defaults to
kafka10[12|14|18|20|22].eqiad.wmnet:9092"
+
+ opt[String]('i', "kafka-input-topics") optional() valueName
"<topic1,...,topicK>" action {
+ (x, p) => p.copy(kafkaInputTopics = x)
+ } text "Input topics to consume. Defaults to webrequest_text"
+
+ opt[String]('o', "kafka-output-topic") optional() valueName "<topic>"
action {
+ (x, p) => p.copy(kafkaOutputTopic = x)
+ } text "Output topic to write to. Defaults to test_pageviews_joal"
+
+ opt[Int]("batch-duration-seconds") optional() action {
+ (x, p) => p.copy(batchDurationSecs = x)
+ } text "Batch duration in seconds. Defaults to 10."
+
+ opt[String]("checkpoint-dir") optional() valueName "<path>" action {
+ (x, p) => p.copy(checkpointDirectory = if (x.endsWith("/")) x else x +
"/")
+ } text ("Temporary directory for check-pointing streaming job.\n\t" +
+ "Defaults to
hdfs://analytics-hadoop/tmp/spark/pageviews_stream_checkpoint")
+
+ opt[Unit]("no-checkpoint") optional() action {
+ (_, p) => p.copy(noCheckpoint = true)
+ } text "Force NOT using checkpoint if exists (wipes existing checkpoint
directory if any)."
+
+ }
+
+ def main(args: Array[String]) {
+ argsParser.parse(args, Params()) match {
+
+ case Some(params) =>
+ // Initial setup - Spark, SQLContext
+ val conf = new SparkConf().setAppName("PageviewsStream")
+ val sc = new SparkContext(conf)
+ run(
+ sc,
+ params.kafkaBrokers,
+ params.kafkaInputTopics,
+ params.kafkaOutputTopic,
+ params.batchDurationSecs,
+ params.checkpointDirectory,
+ params.noCheckpoint
+ )
+
+ case None => sys.exit(1)
+ }
+ }
+}
\ No newline at end of file
--
To view, visit https://gerrit.wikimedia.org/r/351370
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I2e939a463fc523e0d734a84951decd1973aa35d2
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery/source
Gerrit-Branch: master
Gerrit-Owner: Joal <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits