Joal has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/373030 )

Change subject: Add tranquility to the banner streaming job
......................................................................

Add tranquility to the banner streaming job

In order to productionise druid pseudo-realtime ingestion
we want to use tranquility. We used to have standalone processes
reading from Kafka and inserting in druid, but is feels easier
to have tranquility running as part of the streaming
transformation job.

Change-Id: I22b0b641ad2ad2f1c88888332b03373abe3e0d00
---
M refinery-job/pom.xml
M 
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/BannerImpressionsStream.scala
2 files changed, 152 insertions(+), 3 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source 
refs/changes/30/373030/1

diff --git a/refinery-job/pom.xml b/refinery-job/pom.xml
index 5a0173b..b505dbd 100644
--- a/refinery-job/pom.xml
+++ b/refinery-job/pom.xml
@@ -163,6 +163,19 @@
             <scope>test</scope>
         </dependency>
 
+        <!-- https://mvnrepository.com/artifact/io.druid/tranquility-core_2.10 
-->
+        <dependency>
+            <groupId>io.druid</groupId>
+            <artifactId>tranquility-core_2.10</artifactId>
+            <version>0.8.2</version>
+        </dependency>
+
+        <dependency>
+            <groupId>io.druid</groupId>
+            <artifactId>tranquility-spark_2.10</artifactId>
+            <version>0.8.2</version>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/BannerImpressionsStream.scala
 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/BannerImpressionsStream.scala
index 4939b03..b71a723 100644
--- 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/BannerImpressionsStream.scala
+++ 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/BannerImpressionsStream.scala
@@ -1,18 +1,28 @@
 package org.wikimedia.analytics.refinery.job
 
+import com.metamx.common.Granularity
+import com.metamx.tranquility.beam.Beam
+import com.metamx.tranquility.druid._
+import com.metamx.tranquility.spark.BeamFactory
 import com.netaporter.uri.Uri
+import io.druid.granularity.QueryGranularities
+import io.druid.query.aggregation.{DoubleSumAggregatorFactory, 
LongSumAggregatorFactory}
 import kafka.serializer.StringDecoder
+import org.apache.curator.framework.CuratorFrameworkFactory
+import org.apache.curator.retry.BoundedExponentialBackoffRetry
 import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer, 
ProducerConfig}
 import org.apache.log4j.{Level, Logger}
 import org.apache.spark.streaming.kafka.KafkaUtils
 import org.apache.spark.{SparkContext, SparkConf}
 import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.joda.time.{Period, DateTime}
 import org.wikimedia.analytics.refinery.core.Webrequest
 import scopt.OptionParser
 import org.json4s._
 import org.json4s.jackson.JsonMethods._
 import org.json4s.JsonDSL._
 import scala.collection.JavaConverters._
+import com.metamx.tranquility.beam._
 
 /**
   *
@@ -50,6 +60,125 @@
 
 
 object BannerImpressionsStream {
+/*
+  {
+    "dataSources" : [
+    {
+      "spec" : {
+        "dataSchema" : {
+        "dataSource" : "banner_activity_minutely",
+        "metricsSpec" : [
+      {
+        "name" : "request_count",
+        "type" : "longSum",
+        "fieldName": "request_count"
+      },
+      {
+        "name" : "normalized_request_count",
+        "type" : "doubleSum",
+        "fieldName": "normalized_request_count"
+      }
+        ],
+        "granularitySpec" : {
+        "segmentGranularity" : "day",
+        "queryGranularity" : "minute",
+        "type" : "uniform"
+      },
+        "parser" : {
+        "type" : "string",
+        "parseSpec" : {
+        "format" : "json",
+        "timestampSpec" : { "column" : "dt", "format" : "iso" },
+        "dimensionsSpec" : {
+        "dimensions" : ["campaign", "banner", "project", "uselang", "bucket", 
"anonymous", "status_code", "country", "device", "sample_rate"]
+      }
+      }
+      }
+      },
+        "tuningConfig" : {
+        "type" : "realtime",
+        "windowPeriod" : "PT10M",
+        "intermediatePersistPeriod" : "PT10M",
+        "maxRowsInMemory" : "100000"
+      }
+      },
+      "properties" : {
+        "task.partitions" : "1",
+        "task.replicants" : "3",
+        "topicPattern": "test_banner_impressions_joal"
+      }
+    }
+    ],
+    "properties" : {
+      "druid.discovery.curator.path": "discovery",
+      "zookeeper.connect" : 
"druid1001.eqiad.wmnet,druid1002.eqiad.wmnet,druid1003.eqiad.wmnet",
+      "kafka.zookeeper.connect": 
"conf1001.eqiad.wmnet,conf1002.eqiad.wmnet,conf1003.eqiad.wmnet/kafka/eqiad",
+      "kafka.group.id": "test_banner_impressions_joal-002",
+      "kafka.auto.offset.reset": "largest",
+      "mapreduce.output.fileoutputformat.compress": 
"org.apache.hadoop.io.compress.GzipCodec"
+    }
+  }
+  */
+  case class BannerImpressionEvent(
+                                    dt: DateTime,
+                                    campaign: String,
+                                    banner: String,
+                                    project: String,
+                                    uselang: String,
+                                    bucket: String,
+                                    anonymous: Boolean,
+                                    status_code: String,
+                                    country: String,
+                                    device: String,
+                                    sample_rate: Double,
+                                    request_count: Long,
+                                    normalized_request_count: Double
+                                  )
+
+
+  class BannerImpressionBeamFactory extends BeamFactory[BannerImpressionEvent] 
{
+    // Return a singleton, so the same connection is shared across all tasks 
in the same JVM.
+    def makeBeam: Beam[BannerImpressionEvent] = 
BannerImpressionBeamFactory.BeamInstance
+  }
+
+  object BannerImpressionBeamFactory
+  {
+    val BeamInstance: Beam[BannerImpressionEvent] = {
+      // Tranquility uses ZooKeeper (through Curator framework) for 
coordination.
+      val curator = CuratorFrameworkFactory.newClient(
+        
"druid1001.eqiad.wmnet,druid1002.eqiad.wmnet,druid1003.eqiad.wmnet/druid/analytics-eqiad",
+        new BoundedExponentialBackoffRetry(100, 3000, 5)
+      )
+      curator.start()
+
+      val indexService = "druid/overlord" // Your overlord's druid.service, 
with slashes replaced by colons.
+      val discoveryPath = "/druid/analytics-eqiad/discovery"
+      val dataSource = "banner_activity_minutely"
+      val dimensions = IndexedSeq("campaign", "banner", "project", "uselang", 
"bucket", "anonymous", "status_code", "country", "device", "sample_rate")
+      val aggregators = Seq(
+        new LongSumAggregatorFactory("request_count", "request_count"),
+        new DoubleSumAggregatorFactory("normalized_request_count", 
"normalized_request_count")
+
+      )
+
+      // Expects simpleEvent.timestamp to return a Joda DateTime object.
+      DruidBeams
+        .builder((event: BannerImpressionEvent) => event.dt)
+        .curator(curator)
+        .discoveryPath(discoveryPath)
+        .location(DruidLocation(DruidEnvironment(indexService), dataSource))
+        .rollup(DruidRollup(SpecificDruidDimensions(dimensions), aggregators, 
QueryGranularities.MINUTE))
+        .tuning(
+          ClusteredBeamTuning(
+            segmentGranularity = Granularity.DAY,
+            windowPeriod = new Period("PT10M"),
+            partitions = 1,
+            replicants = 3
+          )
+        )
+        .buildBeam()
+    }
+  }
 
   @transient
   lazy val log: Logger = Logger.getLogger(this.getClass)
@@ -146,15 +275,22 @@
         }.
         flatMap(j =>
           try {
-            Seq(compact(render(j)))
+            //Seq(compact(render(j)))
+            Seq(j.extract[BannerImpressionEvent])
           } catch {
             case e: Throwable =>
               log.warn("Exception occured when rendering final banner json 
event: " + e.getMessage)
               Seq.empty
           })
 
+      // Add this import to your Spark job to be able to propagate events from 
any RDD to Druid
+      import com.metamx.tranquility.spark.BeamRDD._
+
+      // Output banners to druid through tranquility
+      bannerStream.foreachRDD(rdd => rdd.propagate(new 
BannerImpressionBeamFactory))
+
       // Output banners data back to kafka
-      bannerStream.foreachRDD(rdd => {
+      /*bannerStream.foreachRDD(rdd => {
         System.out.println("# events = " + rdd.count())
 
         rdd.foreachPartition(partition => {
@@ -173,7 +309,7 @@
           })
           producer.close()
         })
-      })
+      })*/
       ssc
     }
 

-- 
To view, visit https://gerrit.wikimedia.org/r/373030
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I22b0b641ad2ad2f1c88888332b03373abe3e0d00
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery/source
Gerrit-Branch: master
Gerrit-Owner: Joal <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to