Joal has uploaded a new change for review. (
https://gerrit.wikimedia.org/r/373030 )
Change subject: Add tranquility to the banner streaming job
......................................................................
Add tranquility to the banner streaming job
In order to productionise druid pseudo-realtime ingestion
we want to use tranquility. We used to have standalone processes
reading from Kafka and inserting in druid, but is feels easier
to have tranquility running as part of the streaming
transformation job.
Change-Id: I22b0b641ad2ad2f1c88888332b03373abe3e0d00
---
M refinery-job/pom.xml
M
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/BannerImpressionsStream.scala
2 files changed, 152 insertions(+), 3 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source
refs/changes/30/373030/1
diff --git a/refinery-job/pom.xml b/refinery-job/pom.xml
index 5a0173b..b505dbd 100644
--- a/refinery-job/pom.xml
+++ b/refinery-job/pom.xml
@@ -163,6 +163,19 @@
<scope>test</scope>
</dependency>
+ <!-- https://mvnrepository.com/artifact/io.druid/tranquility-core_2.10
-->
+ <dependency>
+ <groupId>io.druid</groupId>
+ <artifactId>tranquility-core_2.10</artifactId>
+ <version>0.8.2</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.druid</groupId>
+ <artifactId>tranquility-spark_2.10</artifactId>
+ <version>0.8.2</version>
+ </dependency>
+
</dependencies>
<build>
diff --git
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/BannerImpressionsStream.scala
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/BannerImpressionsStream.scala
index 4939b03..b71a723 100644
---
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/BannerImpressionsStream.scala
+++
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/BannerImpressionsStream.scala
@@ -1,18 +1,28 @@
package org.wikimedia.analytics.refinery.job
+import com.metamx.common.Granularity
+import com.metamx.tranquility.beam.Beam
+import com.metamx.tranquility.druid._
+import com.metamx.tranquility.spark.BeamFactory
import com.netaporter.uri.Uri
+import io.druid.granularity.QueryGranularities
+import io.druid.query.aggregation.{DoubleSumAggregatorFactory,
LongSumAggregatorFactory}
import kafka.serializer.StringDecoder
+import org.apache.curator.framework.CuratorFrameworkFactory
+import org.apache.curator.retry.BoundedExponentialBackoffRetry
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer,
ProducerConfig}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.joda.time.{Period, DateTime}
import org.wikimedia.analytics.refinery.core.Webrequest
import scopt.OptionParser
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.json4s.JsonDSL._
import scala.collection.JavaConverters._
+import com.metamx.tranquility.beam._
/**
*
@@ -50,6 +60,125 @@
object BannerImpressionsStream {
+/*
+ {
+ "dataSources" : [
+ {
+ "spec" : {
+ "dataSchema" : {
+ "dataSource" : "banner_activity_minutely",
+ "metricsSpec" : [
+ {
+ "name" : "request_count",
+ "type" : "longSum",
+ "fieldName": "request_count"
+ },
+ {
+ "name" : "normalized_request_count",
+ "type" : "doubleSum",
+ "fieldName": "normalized_request_count"
+ }
+ ],
+ "granularitySpec" : {
+ "segmentGranularity" : "day",
+ "queryGranularity" : "minute",
+ "type" : "uniform"
+ },
+ "parser" : {
+ "type" : "string",
+ "parseSpec" : {
+ "format" : "json",
+ "timestampSpec" : { "column" : "dt", "format" : "iso" },
+ "dimensionsSpec" : {
+ "dimensions" : ["campaign", "banner", "project", "uselang", "bucket",
"anonymous", "status_code", "country", "device", "sample_rate"]
+ }
+ }
+ }
+ },
+ "tuningConfig" : {
+ "type" : "realtime",
+ "windowPeriod" : "PT10M",
+ "intermediatePersistPeriod" : "PT10M",
+ "maxRowsInMemory" : "100000"
+ }
+ },
+ "properties" : {
+ "task.partitions" : "1",
+ "task.replicants" : "3",
+ "topicPattern": "test_banner_impressions_joal"
+ }
+ }
+ ],
+ "properties" : {
+ "druid.discovery.curator.path": "discovery",
+ "zookeeper.connect" :
"druid1001.eqiad.wmnet,druid1002.eqiad.wmnet,druid1003.eqiad.wmnet",
+ "kafka.zookeeper.connect":
"conf1001.eqiad.wmnet,conf1002.eqiad.wmnet,conf1003.eqiad.wmnet/kafka/eqiad",
+ "kafka.group.id": "test_banner_impressions_joal-002",
+ "kafka.auto.offset.reset": "largest",
+ "mapreduce.output.fileoutputformat.compress":
"org.apache.hadoop.io.compress.GzipCodec"
+ }
+ }
+ */
+ case class BannerImpressionEvent(
+ dt: DateTime,
+ campaign: String,
+ banner: String,
+ project: String,
+ uselang: String,
+ bucket: String,
+ anonymous: Boolean,
+ status_code: String,
+ country: String,
+ device: String,
+ sample_rate: Double,
+ request_count: Long,
+ normalized_request_count: Double
+ )
+
+
+ class BannerImpressionBeamFactory extends BeamFactory[BannerImpressionEvent]
{
+ // Return a singleton, so the same connection is shared across all tasks
in the same JVM.
+ def makeBeam: Beam[BannerImpressionEvent] =
BannerImpressionBeamFactory.BeamInstance
+ }
+
+ object BannerImpressionBeamFactory
+ {
+ val BeamInstance: Beam[BannerImpressionEvent] = {
+ // Tranquility uses ZooKeeper (through Curator framework) for
coordination.
+ val curator = CuratorFrameworkFactory.newClient(
+
"druid1001.eqiad.wmnet,druid1002.eqiad.wmnet,druid1003.eqiad.wmnet/druid/analytics-eqiad",
+ new BoundedExponentialBackoffRetry(100, 3000, 5)
+ )
+ curator.start()
+
+ val indexService = "druid/overlord" // Your overlord's druid.service,
with slashes replaced by colons.
+ val discoveryPath = "/druid/analytics-eqiad/discovery"
+ val dataSource = "banner_activity_minutely"
+ val dimensions = IndexedSeq("campaign", "banner", "project", "uselang",
"bucket", "anonymous", "status_code", "country", "device", "sample_rate")
+ val aggregators = Seq(
+ new LongSumAggregatorFactory("request_count", "request_count"),
+ new DoubleSumAggregatorFactory("normalized_request_count",
"normalized_request_count")
+
+ )
+
+ // Expects simpleEvent.timestamp to return a Joda DateTime object.
+ DruidBeams
+ .builder((event: BannerImpressionEvent) => event.dt)
+ .curator(curator)
+ .discoveryPath(discoveryPath)
+ .location(DruidLocation(DruidEnvironment(indexService), dataSource))
+ .rollup(DruidRollup(SpecificDruidDimensions(dimensions), aggregators,
QueryGranularities.MINUTE))
+ .tuning(
+ ClusteredBeamTuning(
+ segmentGranularity = Granularity.DAY,
+ windowPeriod = new Period("PT10M"),
+ partitions = 1,
+ replicants = 3
+ )
+ )
+ .buildBeam()
+ }
+ }
@transient
lazy val log: Logger = Logger.getLogger(this.getClass)
@@ -146,15 +275,22 @@
}.
flatMap(j =>
try {
- Seq(compact(render(j)))
+ //Seq(compact(render(j)))
+ Seq(j.extract[BannerImpressionEvent])
} catch {
case e: Throwable =>
log.warn("Exception occured when rendering final banner json
event: " + e.getMessage)
Seq.empty
})
+ // Add this import to your Spark job to be able to propagate events from
any RDD to Druid
+ import com.metamx.tranquility.spark.BeamRDD._
+
+ // Output banners to druid through tranquility
+ bannerStream.foreachRDD(rdd => rdd.propagate(new
BannerImpressionBeamFactory))
+
// Output banners data back to kafka
- bannerStream.foreachRDD(rdd => {
+ /*bannerStream.foreachRDD(rdd => {
System.out.println("# events = " + rdd.count())
rdd.foreachPartition(partition => {
@@ -173,7 +309,7 @@
})
producer.close()
})
- })
+ })*/
ssc
}
--
To view, visit https://gerrit.wikimedia.org/r/373030
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I22b0b641ad2ad2f1c88888332b03373abe3e0d00
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery/source
Gerrit-Branch: master
Gerrit-Owner: Joal <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits