jenkins-bot has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/373030 )

Change subject: Add tranquility to the banner streaming job
......................................................................


Add tranquility to the banner streaming job

In order to productionise druid pseudo-realtime ingestion
we want to use tranquility. We used to have standalone processes
reading from Kafka and inserting in druid, but is feels easier
to have tranquility running as part of the streaming
transformation job.
This patch also provides parameters and error handling making
the streaming job much more resilient.
Finally, due to dependency issues, we need to make this job run on
spark 2.1.1, and in order to do so we put the job in a new temporary
package to which we'll iteratively move spark jobs when moving to
new spark version.

Bug: T168550, T169101

Change-Id: I22b0b641ad2ad2f1c88888332b03373abe3e0d00
---
M pom.xml
A refinery-job-spark-2.1/README.md
A refinery-job-spark-2.1/pom.xml
A 
refinery-job-spark-2.1/src/main/scala/org/wikimedia/analytics/refinery/job/druid/BannerImpressionsStream.scala
A 
refinery-job-spark-2.1/src/main/scala/org/wikimedia/analytics/refinery/job/druid/TranquilityBeamFactories.scala
5 files changed, 783 insertions(+), 0 deletions(-)

Approvals:
  Joal: Looks good to me, approved
  jenkins-bot: Verified



diff --git a/pom.xml b/pom.xml
index e556a4a..764509d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -18,6 +18,10 @@
     <module>refinery-job</module>
     <module>refinery-camus</module>
     <module>refinery-cassandra</module>
+
+    <!-- Temporary module to iteratively move refinery-job jobs
+         from spark 1.6 with scala 2.10 to spark 2.1 with scala 2.11 -->
+    <module>refinery-job-spark-2.1</module>
   </modules>
 
   <scm>
diff --git a/refinery-job-spark-2.1/README.md b/refinery-job-spark-2.1/README.md
new file mode 100644
index 0000000..d77ac69
--- /dev/null
+++ b/refinery-job-spark-2.1/README.md
@@ -0,0 +1,7 @@
+# refinery-job-spark-2.1
+
+This module is temporary. It contains jobs moved from the refinery-job 
package, updated to be used with
+Spark 1.6 with Scala 2.10 to Spark 2.1 with Scala 2.11.
+Once all the jobs are moved, they'll move back to refinery-job with updated 
dependecies and this module
+will be deleted.
+
diff --git a/refinery-job-spark-2.1/pom.xml b/refinery-job-spark-2.1/pom.xml
new file mode 100644
index 0000000..23ee3b4
--- /dev/null
+++ b/refinery-job-spark-2.1/pom.xml
@@ -0,0 +1,262 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.wikimedia.analytics.refinery</groupId>
+        <artifactId>refinery</artifactId>
+        <version>0.0.57-SNAPSHOT</version>
+    </parent>
+
+    <groupId>org.wikimedia.analytics.refinery.job</groupId>
+    <artifactId>refinery-job-spark-2.1</artifactId>
+    <name>Wikimedia Analytics Refinery Jobs for Spark 2.1 (temporary 
module)</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.wikimedia.analytics.refinery.core</groupId>
+            <artifactId>refinery-core</artifactId>
+        </dependency>
+
+        <!-- 
+             adding explicit dep for snappy, otherwise
+             spark assumes is on the java.library.path
+             see: 
https://github.com/rvs/snappy-java/blob/master/src/main/resources/org/xerial/snappy/SnappyNativeLoader.java#L47
+        -->
+        <!-- https://mvnrepository.com/artifact/org.xerial.snappy/snappy-java 
-->
+        <dependency>
+            <groupId>org.xerial.snappy</groupId>
+            <artifactId>snappy-java</artifactId>
+            <version>1.1.2.5</version>
+             <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <!-- javax.servlet in hadoop-common is older than the one 
in spark -->
+                    <groupId>javax.servlet</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_2.11</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-hive_2.11</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_2.11</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-streaming_2.11</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.netaporter</groupId>
+            <artifactId>scala-uri_2.11</artifactId>
+            <version>0.4.16</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.twitter</groupId>
+            <artifactId>algebird-core_2.11</artifactId>
+            <version>0.10.2</version>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.github.scopt</groupId>
+            <artifactId>scopt_2.11</artifactId>
+            <version>3.3.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>${scala.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.scalatest</groupId>
+            <artifactId>scalatest_2.11</artifactId>
+            <version>3.0.1</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.github.nscala-time</groupId>
+            <artifactId>nscala-time_2.11</artifactId>
+            <version>2.0.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.databricks</groupId>
+            <artifactId>spark-avro_2.11</artifactId>
+            <version>2.0.1</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.databricks</groupId>
+            <artifactId>spark-csv_2.11</artifactId>
+            <version>1.5.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-graphx_2.11</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+
+        <!-- https://mvnrepository.com/artifact/graphframes/graphframes -->
+        <dependency>
+            <groupId>graphframes</groupId>
+            <artifactId>graphframes</artifactId>
+            <version>0.5.0-spark2.1-s_2.11</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.holdenkarau</groupId>
+            <artifactId>spark-testing-base_2.11</artifactId>
+            <version>2.1.1_0.7.4</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>io.druid</groupId>
+            <artifactId>tranquility-spark_2.11</artifactId>
+            <version>0.8.2</version>
+            <exclusions>
+                <!-- Excluded for dependicy conflict with spark -->
+                <exclusion>
+                    <groupId>org.apache.derby</groupId>
+                    <artifactId>derby</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.scala-tools</groupId>
+                <artifactId>maven-scala-plugin</artifactId>
+                <version>2.15.2</version>
+                <configuration>
+                    <args>
+                        <arg>-deprecation</arg>
+                        <arg>-feature</arg>
+                    </args>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>compile</goal>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+              <groupId>org.scalatest</groupId>
+              <artifactId>scalatest-maven-plugin</artifactId>
+              <version>1.0</version>
+              <configuration>
+                
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+                <junitxml>.</junitxml>
+                <filereports>WDF TestSuite.txt</filereports>
+              </configuration>
+              <executions>
+                <execution>
+                  <id>test</id>
+                  <goals>
+                    <goal>test</goal>
+                  </goals>
+                </execution>
+              </executions>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>2.0</version>
+                <configuration>
+                    <shadedArtifactAttached>false</shadedArtifactAttached>
+                    <filters>
+                        <filter>
+                            <artifact>*:*</artifact>
+                            <excludes>
+                                <exclude>META-INF/*.SF</exclude>
+                                <exclude>META-INF/*.DSA</exclude>
+                                <exclude>META-INF/*.RSA</exclude>
+                            </excludes>
+                        </filter>
+                    </filters>
+                </configuration>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            
<createDependencyReducedPom>false</createDependencyReducedPom>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.2</version>
+                <configuration>
+                    <source>${java.version}</source>
+                    <target>${java.version}</target>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+    <properties>
+        <!-- Override with new versions to iteratively move spark jobs to -->
+        <scala.version>2.11.7</scala.version>
+        <spark.version>2.1.1</spark.version>
+    </properties>
+
+</project>
diff --git 
a/refinery-job-spark-2.1/src/main/scala/org/wikimedia/analytics/refinery/job/druid/BannerImpressionsStream.scala
 
b/refinery-job-spark-2.1/src/main/scala/org/wikimedia/analytics/refinery/job/druid/BannerImpressionsStream.scala
new file mode 100644
index 0000000..fbc4285
--- /dev/null
+++ 
b/refinery-job-spark-2.1/src/main/scala/org/wikimedia/analytics/refinery/job/druid/BannerImpressionsStream.scala
@@ -0,0 +1,327 @@
+package org.wikimedia.analytics.refinery.job.druid
+
+import com.netaporter.uri.Uri
+import kafka.serializer.StringDecoder
+import org.apache.log4j.{Level, Logger}
+import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.kafka._
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.{SparkConf, SparkContext}
+import org.joda.time.{DateTime, Period}
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+import org.wikimedia.analytics.refinery.core.Webrequest
+import scopt.OptionParser
+
+/**
+  *
+  * To be launched with Spark 2:
+  *
+
+spark-submit --master yarn --deploy-mode cluster \
+  --class org.wikimedia.analytics.refinery.job.druid.BannerImpressionsStream \
+  --driver-memory 2G --num-executors 4 --executor-cores 3 --executor-memory 4G 
\
+  /home/joal/code/refinery-job-0.0.52-streams.jar
+  
+  *
+  */
+
+object BannerImpressionsStream {
+
+  @transient
+  lazy val log: Logger = Logger.getLogger(this.getClass)
+
+  def run(
+           @transient sc: SparkContext,
+           kafkaBrokers:String,
+           kafkaInputTopics: String,
+           batchDurationSeconds: Int,
+           checkpointDirectory: String,
+           noCheckpoint: Boolean,
+           tranquilityBeamConf: TranquilityBeamConf
+         ): Unit = {
+
+    def newStreamingContext() = {
+      val ssc = new StreamingContext(sc, Seconds(batchDurationSeconds.toLong))
+      ssc.checkpoint(checkpointDirectory)
+
+      val kafkaInputTopicsSet = kafkaInputTopics.split(",").toSet
+      val KafkaInputParameters = Map[String, String]("metadata.broker.list" -> 
kafkaBrokers)
+
+      // Get kafka batches from input topics
+      val messageStream = KafkaUtils.createDirectStream[String, String, 
StringDecoder, StringDecoder](
+        ssc,
+        KafkaInputParameters,
+        kafkaInputTopicsSet
+      )
+
+      // To parse JSON object
+      @transient
+      implicit lazy val formats = DefaultFormats
+
+      // Compute banner oriented filtering / conversion / aggregation
+      val bannerStream: DStream[Map[String, Any]] = messageStream.
+        // Extract the JSON message from the Kafka (Key, Value) message.
+        flatMap { case (_, str) =>
+          try {
+            Seq(parse(str))
+          } catch {
+            case e: Throwable =>
+              log.warn("Exception occured when parsing JSON:" + e.getMessage)
+              Seq.empty
+          }
+        }.
+        filter(json => {
+          try {
+            (json \ "uri_path").values.asInstanceOf[String] == 
"/beacon/impression" &&
+              (json \ 
"uri_query").values.asInstanceOf[String].contains("debug=false") &&
+              !Webrequest.getInstance().isSpider((json \ 
"user_agent").values.asInstanceOf[String])
+          } catch {
+            case e: Throwable =>
+              log.warn("Exception occured when filtering banner related 
events: " + e.getMessage)
+              false
+          }
+        }).
+        flatMap(json => {
+          try {
+            val uri_query = (json \\ "uri_query").values.asInstanceOf[String]
+            val uri: Uri = Uri.parse("http://bla.org/woo/"; + uri_query)
+            val minuteTs = (json \ 
"dt").values.asInstanceOf[String].replaceAll(":\\d\\d$", ":00.000Z")
+            val params: Map[String, Seq[String]] = uri.query.paramMap
+
+            val mapEvent = Map(
+              "timestamp" -> minuteTs,
+              "campaign" -> params.getOrElse("campaign", 
List.empty[String]).headOption,
+              "banner" -> params.getOrElse("banner", 
List.empty[String]).headOption,
+              "project" -> params.getOrElse("project", 
List.empty[String]).headOption,
+              "uselang" -> params.getOrElse("uselang", 
List.empty[String]).headOption,
+              "bucket" -> params.getOrElse("bucket", 
List.empty[String]).headOption,
+              "anonymous" -> params.getOrElse("anonymous", 
List.empty[String]).headOption.contains("true"),
+              "status_code" -> params.getOrElse("statusCode", 
List.empty[String]).headOption,
+              "country" -> params.getOrElse("country", 
List.empty[String]).headOption,
+              "device" -> params.getOrElse("device", 
List.empty[String]).headOption,
+              "sample_rate" -> params.getOrElse("recordImpressionSampleRate", 
List.empty[String]).headOption.map(_.toDouble))
+            Seq(mapEvent)
+          } catch {
+            case e: Throwable =>
+              log.warn("Exception occured when building banner json event: " + 
e.getMessage)
+              Seq.empty
+          }
+        }).
+        countByValue().
+        map { case (mapEvent, count) =>
+          val normalizedRequestCount = 
mapEvent("sample_rate").asInstanceOf[Option[Double]].map(count / _)
+          mapEvent + ("request_count" -> count) + ("normalized_request_count" 
-> normalizedRequestCount)
+        }
+
+      // Add this import to your Spark job to be able to propagate events from 
any RDD to Druid
+      import com.metamx.tranquility.spark.BeamRDD._
+
+      // Output banners to druid through tranquility
+      bannerStream.foreachRDD(_.propagate(new 
TranquilitySingletonBeamFactory(tranquilityBeamConf)))
+
+      ssc
+    }
+
+    val context = {
+      if (noCheckpoint) newStreamingContext()
+      else StreamingContext.getOrCreate(checkpointDirectory, 
newStreamingContext)
+    }
+
+    // Start the context
+    context.start()
+    context.awaitTermination()
+  }
+
+  /**
+    * Config class for CLI argument parser using scopt
+    */
+  case class Params(
+                     kafkaBrokers: String = Seq("12", "13", "14", "18", "20", 
"22").map("kafka10" + _ + ".eqiad.wmnet:9092").mkString(","),
+                     kafkaInputTopics: String = "webrequest_text",
+                     batchDurationSecs: Int = 30,
+                     checkpointDirectory: String = 
"hdfs://analytics-hadoop/tmp/spark/banner_impressions_stream_checkpoint",
+                     noCheckpoint: Boolean = false,
+                     zookeeperHosts: String = 
"druid1001.eqiad.wmnet,druid1002.eqiad.wmnet,druid1003.eqiad.wmnet",
+                     zookeeperDruidDiscoveryPath: String = 
"/druid/analytics-eqiad/discovery",
+                     zookeeperDruidIndexingService: String = "druid/overlord",
+                     druidDatasource: String = "banner_activity_minutely",
+                     indexingSegmentGranularity: DruidGranularityWrapper = 
DruidGranularityWrapper("DAY"),
+                     indexingWindowPeriod: Period = new Period("PT10M"),
+                     indexingTaskReplication: Int = 3,
+                     indexingTaskPartitions: Int = 1
+                   )
+
+  /**
+    * Define the command line options parser
+    */
+  val argsParser = new OptionParser[Params]("Banner Impressions Stream") {
+    head("Banner Impressions Stream", "")
+    note( "Extract banner impressions data from kafka webrequest stream and 
write it back to kafka")
+    help("help") text "Prints this usage text"
+
+    opt[String]("kafka-brokers") optional() valueName "<broker1,...,brokerN>" 
action {
+      (x, p) => p.copy(kafkaBrokers = x)
+    } text "Kafka brokers to consume from. Defaults to 
kafka10[12|14|18|20|22].eqiad.wmnet:9092"
+
+    opt[String]("kafka-input-topics") optional() valueName 
"<topic1,...,topicK>" action {
+      (x, p) => p.copy(kafkaInputTopics = x)
+    } text "Input topics to consume. Defaults to webrequest_text"
+
+    opt[Int]("batch-duration-seconds") optional() action {
+      (x, p) => p.copy(batchDurationSecs = x)
+    } text "Spark batch duration in seconds. Defaults to 30."
+
+    opt[String]("checkpoint-dir") optional() valueName "<path>" action {
+      (x, p) => p.copy(checkpointDirectory = if (x.endsWith("/")) x else x + 
"/")
+    } text ("Temporary directory for check-pointing streaming job.\n\t" +
+      "Defaults to 
hdfs://analytics-hadoop/tmp/spark/banner_impressions_stream_checkpoint")
+
+    opt[Unit]("no-checkpoint") optional() action {
+      (_, p) => p.copy(noCheckpoint = true)
+    } text "Force NOT using checkpoint if exists (wipes existing checkpoint 
directory if any)."
+
+    opt[String]("zookeeper-hosts") optional() valueName "<host1,...,hostN>" 
action {
+      (x, p) => p.copy(zookeeperHosts = x)
+    } text "Zookeeper hosts handling druid discovery data. Defaults to 
druid1001.eqiad.wmnet,druid1002.eqiad.wmnet,druid1003.eqiad.wmnet"
+
+    opt[String]("zookeeper-druid-discovery-path") optional() valueName 
"<path>" action {
+      (x, p) => p.copy(zookeeperDruidDiscoveryPath = x)
+    } text "Zookeeper data path to Druid discovery service. Defaults to 
/druid/analytics-eqiad/discovery"
+
+    opt[String]("zookeeper-druid-indexing-service") optional() valueName 
"<name>" action {
+      (x, p) => p.copy(zookeeperDruidIndexingService = x)
+    } text "Zookeeper druid indexing service name. Defaults to druid/overlord"
+
+    opt[String]("druid-datasource") optional() valueName "<source>" action {
+      (x, p) => p.copy(druidDatasource = x)
+    } text "Druid datasource handling banners data. Defaults to 
banner_activity_minutely"
+
+    opt[String]("druid-indexing-segment-granularity") optional() valueName 
"<granularity>" action {
+      (x, p) => p.copy(indexingSegmentGranularity = DruidGranularityWrapper(x))
+    } validate { x => if (DruidGranularityWrapper(x).isValid) success else 
failure("Invalid segment granularity")
+    } text "Druid indexing segment granularity (druid time-period, can be 
MINUTE, HOUR, DAY, WEEK...). Defaults to DAY"
+
+    opt[String]("druid-indexing-window-period") optional() valueName 
"<period>" action {
+      (x, p) => p.copy(indexingWindowPeriod = new Period(x))
+    } validate { x => try {
+        new Period(x)
+        success
+      } catch {
+        case e: java.lang.IllegalArgumentException => failure("Invalid window 
period")
+      }
+    } text "Druid indexing window (ISO 8601 time-period, can be PT1H, PT10M 
for instance). Defaults to PT10M"
+
+    opt[Int]("druid-indexing-partitions") optional() valueName "<num>" action {
+      (x, p) => p.copy(indexingTaskPartitions = x)
+    } text "Druid indexing partitions (parallelisation level). Defaults to 1"
+
+    opt[Int]("druid-indexing-replication") optional() valueName "<num>" action 
{
+      (x, p) => p.copy(indexingTaskReplication = x)
+    } text "Druid indexing replication factor for each partition. Defaults to 
3"
+
+  }
+
+  def main(args: Array[String]): Unit = {
+    val params = args.headOption match {
+      // Case when our job options are given as a single string.  Split them
+      // and pass them to argsParser.
+      case Some("--options") =>
+        argsParser.parse(args(1).split("\\s+"), 
Params()).getOrElse(sys.exit(1))
+      // Else the normal usage, each CLI opts can be parsed as a job option.
+      case _ =>
+        argsParser.parse(args, Params()).getOrElse(sys.exit(1))
+    }
+
+    // Exit non-zero if if any refinements failed.
+    apply(params)
+  }
+
+  def apply(params: Params): Unit = {
+    //Setup logging
+    val appLogLevel = Level.INFO
+    val allLogLevel = Level.ERROR
+
+    Logger.getRootLogger.setLevel(appLogLevel)
+    Logger.getLogger("org.wikimedia").setLevel(appLogLevel)
+
+    Logger.getLogger("akka").setLevel(allLogLevel)
+    Logger.getLogger("com.databricks").setLevel(allLogLevel)
+    Logger.getLogger("DataNucleus").setLevel(allLogLevel)
+    Logger.getLogger("org.apache").setLevel(allLogLevel)
+    Logger.getLogger("org.spark-project").setLevel(allLogLevel)
+
+    // Initial setup - Spark, SQLContext
+    val conf = new SparkConf()
+      .setAppName("BannerImpressionsStream")
+
+      /******** SPARK GLOBAL CONFIG **********/
+      // Better serializer than java one
+      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+      // Increase job parallelity by reducing Spark Delay Scheduling 
(potentially big performance impact (!)) (Default: 3s)`
+      .set("spark.locality.wait", "10")
+      // Increase max task failures before failing job (Default: 4)
+      .set("spark.task.maxFailures", "8")
+      // Prevent killing of stages and corresponding jobs from the Spark UI
+      .set("spark.ui.killEnabled", "false")
+      // Log Spark Configuration in driver log for troubleshooting
+      .set("spark.logConf", "true")
+
+      /******** SPARK STREAMING CONFIG **********/
+      // [Optional] Tweak to balance data processing parallelism vs. task 
scheduling overhead (Default: 200ms)
+      .set("spark.streaming.blockInterval", "200")
+      // Prevent data loss on driver recovery
+      .set("spark.streaming.receiver.writeAheadLog.enable", "true")
+      // Enable backpressure, preventing too much data to be stored internally
+      .set("spark.streaming.backpressure.enabled", "true")
+      // [Optional] Reduce min rate of PID-based backpressure implementation 
(Default: 100)
+      .set("spark.streaming.backpressure.pid.minRate", "10")
+      // spark.streaming.backpressure.initialRate
+      .set("spark.streaming.backpressure.initialRate", "30")
+
+      /******** SPARK YARN CONFIG **********/
+      // [Optional] Set if --driver-memory < 5GB
+      .set("spark.yarn.driver.memoryOverhead", "512")
+      // [Optional] Set if --executor-memory < 10GB
+      .set("spark.yarn.executor.memoryOverhead", "1024")
+      // Increase max application master attempts
+      // (needs to be <= yarn.resourcemanager.am.max-attempts in YARN, which 
defaults to 2)
+      // (Default: yarn.resourcemanager.am.max-attempts)
+      .set("spark.yarn.maxAppAttempts", "4")
+      // Attempt counter considers only the last hour (Default: (none))
+      .set("spark.yarn.am.attemptFailuresValidityInterval", "1h")
+      // Increase max executor failures (Default: max(numExecutors * 2, 3))
+      .set("spark.yarn.max.executor.failures", "32")
+      // Executor failure counter considers only the last hour
+      .set("spark.yarn.executor.failuresValidityInterval", "1h")
+
+    val sc = new SparkContext(conf)
+
+    // Tranquility settings
+    val tranquilityBeamConf = new TranquilityBeamConf(
+      params.zookeeperHosts,
+      params.zookeeperDruidDiscoveryPath,
+      params.zookeeperDruidIndexingService,
+      params.druidDatasource,
+      (m: Map[String, Any]) => 
DateTime.parse(m("timestamp").asInstanceOf[String]),
+      DruidTimestampSpecWrapper("timestamp", "iso", null),
+      IndexedSeq("campaign", "banner", "project", "uselang", "bucket", 
"anonymous", "status_code", "country", "device", "sample_rate"),
+      Seq(LongSum("request_count", "request_count"), 
DoubleSum("normalized_request_count", "normalized_request_count")),
+      DruidQueryGranularityWrapper("MINUTE"),
+      params.indexingSegmentGranularity,
+      params.indexingWindowPeriod,
+      params.indexingTaskReplication,
+      params.indexingTaskPartitions
+    )
+
+    run(
+      sc,
+      params.kafkaBrokers,
+      params.kafkaInputTopics,
+      params.batchDurationSecs,
+      params.checkpointDirectory,
+      params.noCheckpoint,
+      tranquilityBeamConf
+    )
+  }
+}
diff --git 
a/refinery-job-spark-2.1/src/main/scala/org/wikimedia/analytics/refinery/job/druid/TranquilityBeamFactories.scala
 
b/refinery-job-spark-2.1/src/main/scala/org/wikimedia/analytics/refinery/job/druid/TranquilityBeamFactories.scala
new file mode 100644
index 0000000..16b80db
--- /dev/null
+++ 
b/refinery-job-spark-2.1/src/main/scala/org/wikimedia/analytics/refinery/job/druid/TranquilityBeamFactories.scala
@@ -0,0 +1,183 @@
+package org.wikimedia.analytics.refinery.job.druid
+
+import com.metamx.common.Granularity
+import com.metamx.tranquility.beam.{ClusteredBeamTuning, Beam}
+import com.metamx.tranquility.druid.{SpecificDruidDimensions, DruidRollup, 
DruidBeams, DruidLocation}
+import com.metamx.tranquility.spark.BeamFactory
+import io.druid.data.input.impl.TimestampSpec
+import io.druid.granularity.QueryGranularity
+import io.druid.query.aggregation._
+import org.apache.curator.framework.CuratorFrameworkFactory
+import org.apache.curator.retry.BoundedExponentialBackoffRetry
+import org.joda.time.{DateTime, Period}
+
+
+/**
+  * This class defines configuration for a TranquilityBeamFactory.
+  *
+  * @param zookeeperHosts The comma-separated list of zookeeper hosts handling 
druid discovery data
+  * @param zookeeperDruidDiscoveryPath The druid discovery path in zookeeper 
storage
+  * @param zookeeperIndexingService The druid indexing service name in 
zookeeper
+  * @param druidDatasource The druid datasource to index in
+  * @param druidTimestamper A function extracting the timestamp in DateTime 
format out of each Map event
+  * @param druidTimestampSpec The specification of the timestamp field in the 
event Map
+  * @param druidDimensions The druid dimensions to index using the event Map 
keys
+  * @param druidAggregators The druid aggregators to index using event map keys
+  * @param druidQueryGranularity The druid query granularity to index
+  * @param indexingSegmentGranularity The druid segment granularity to index
+  * @param indexingWindowPeriod The window period to consider events to index 
- events late or forward
+  *                             from this window will be discarded
+  * @param indexingTaskPartitions The parallelization level of the indexing 
(total number of indexing tasks
+  *                                 is indexingTaskPartitions * 
indexingTaskReplication)
+  * @param indexingTaskReplication  The replication factor of the indexing 
(total number of indexing tasks
+  *                                 is indexingTaskPartitions * 
indexingTaskReplication)
+  */
+case class TranquilityBeamConf(
+                                // Zookeeper data to connect to Druid
+                                zookeeperHosts: String,
+                                zookeeperDruidDiscoveryPath: String,
+                                zookeeperIndexingService: String,
+
+                                // Druid datasource config
+                                druidDatasource: String,
+                                druidTimestamper: Map[String, Any] => DateTime,
+                                druidTimestampSpec: DruidTimestampSpecWrapper,
+                                druidDimensions: IndexedSeq[String],
+                                druidAggregators: 
Seq[DruidAggregatorFactoryWrapper],
+                                druidQueryGranularity: 
DruidQueryGranularityWrapper,
+
+                                // Druid indexing config
+                                indexingSegmentGranularity: 
DruidGranularityWrapper,
+                                indexingWindowPeriod: Period,
+                                indexingTaskPartitions: Int,
+                                indexingTaskReplication: Int
+                              ) extends Serializable
+
+/**
+  * Wrapper for druid aggregator factories, to prevent Spark serialization 
issues.
+  * Only provide simple aggregators for now (missing filter, historgram and 
javascript).
+  */
+abstract class DruidAggregatorFactoryWrapper extends Serializable {
+  def getDruidAggregatorFactory: AggregatorFactory
+}
+
+case class Count(name: String) extends DruidAggregatorFactoryWrapper {
+  def getDruidAggregatorFactory: AggregatorFactory = new 
CountAggregatorFactory(name)
+}
+
+case class LongSum(name: String, fieldName: String) extends 
DruidAggregatorFactoryWrapper {
+  def getDruidAggregatorFactory: AggregatorFactory = new 
LongSumAggregatorFactory(name, fieldName)
+}
+case class LongMax(name: String, fieldName: String) extends 
DruidAggregatorFactoryWrapper {
+  def getDruidAggregatorFactory: AggregatorFactory = new 
LongMaxAggregatorFactory(name, fieldName)
+}
+case class LongMin(name: String, fieldName: String) extends 
DruidAggregatorFactoryWrapper {
+  def getDruidAggregatorFactory: AggregatorFactory = new 
LongMinAggregatorFactory(name, fieldName)
+}
+
+case class DoubleSum(name: String, fieldName: String) extends 
DruidAggregatorFactoryWrapper {
+  def getDruidAggregatorFactory: AggregatorFactory = new 
DoubleSumAggregatorFactory(name, fieldName)
+}
+case class DoubleMax(name: String, fieldName: String) extends 
DruidAggregatorFactoryWrapper {
+  def getDruidAggregatorFactory: AggregatorFactory = new 
DoubleMaxAggregatorFactory(name, fieldName)
+}
+case class DoubleMin(name: String, fieldName: String) extends 
DruidAggregatorFactoryWrapper {
+  def getDruidAggregatorFactory: AggregatorFactory = new 
DoubleMinAggregatorFactory(name, fieldName)
+}
+
+/**
+  * Wrappers for Druid granularities and TimestampSpec to prevent Spark 
serialization issues
+  */
+case class DruidGranularityWrapper(value: String) extends Serializable {
+  def getGranularity: Granularity = Granularity.valueOf(value)
+  def isValid: Boolean = try { getGranularity; true} catch { case e: 
java.lang.IllegalArgumentException => false}
+}
+case class DruidQueryGranularityWrapper(value: String) extends Serializable {
+  def getGranularity: QueryGranularity = QueryGranularity.fromString(value)
+  def isValid: Boolean = try { getGranularity; true} catch { case e: 
java.lang.IllegalArgumentException => false}
+}
+case class DruidTimestampSpecWrapper(column: String, format: String, 
missingValue: DateTime) extends Serializable {
+  def getTimestampSpec: TimestampSpec = new TimestampSpec(column, format, 
missingValue)
+}
+
+
+/**
+  * This class is to be used to create tranquility beams indexing spark 
Map[String, Any] streams:
+  *   import com.metamx.tranquility.spark.BeamRDD._
+  *   stream.foreachRDD(_.propagate(new TranquilityBeamFactory(conf)))
+  *
+  * Warning: This class creates one beam per object of the class. If you want 
one beam per JVM
+  * (recommended if you use a single type of BeamFactory accross multiple 
spark threads), use
+  * TranquilitySingletonBeamFactory.
+  *
+  * @param conf The TranquilityBeamConf to be used to index
+  */
+class TranquilityBeamFactory(conf: TranquilityBeamConf) extends 
BeamFactory[Map[String, Any]] {
+  // Return a singleton, so the same connection is shared across all tasks in 
the same JVM.
+
+  private var BeamInstance: Option[Beam[Map[String, Any]]] = None
+
+  protected def createBeam: Beam[Map[String, Any]] = {
+    // Tranquility uses ZooKeeper (through Curator framework) for coordination.
+    val curator = CuratorFrameworkFactory.newClient(
+      conf.zookeeperHosts,
+      new BoundedExponentialBackoffRetry(100, 3000, 5)
+    )
+    curator.start()
+
+    val druidLocation: DruidLocation = 
DruidLocation.create(conf.zookeeperIndexingService, conf.druidDatasource)
+
+    DruidBeams
+      .builder(conf.druidTimestamper)
+      .timestampSpec(conf.druidTimestampSpec.getTimestampSpec)
+      .curator(curator)
+      .discoveryPath(conf.zookeeperDruidDiscoveryPath)
+      .location(druidLocation)
+      .rollup(DruidRollup(
+        SpecificDruidDimensions(conf.druidDimensions),
+        conf.druidAggregators.map(_.getDruidAggregatorFactory),
+        conf.druidQueryGranularity.getGranularity))
+      .tuning(
+        ClusteredBeamTuning(
+          segmentGranularity = conf.indexingSegmentGranularity.getGranularity,
+          windowPeriod = conf.indexingWindowPeriod,
+          partitions = conf.indexingTaskPartitions,
+          replicants = conf.indexingTaskReplication
+        )
+      )
+      .buildBeam()
+  }
+
+  def makeBeam: Beam[Map[String, Any]] = {
+    this.synchronized(
+      if (this.BeamInstance.isEmpty) {
+        this.BeamInstance = Some(this.createBeam)
+      }
+    )
+    this.BeamInstance.get
+  }
+}
+
+/**
+  * This class is to be used to create a singleton tranquility beams indexing 
spark Map[String, Any] streams:
+  *   stream.foreachRDD(_.propagate(new TranquilitySingletonBeamFactory(conf)))
+  *
+  * Warning: This class creates one beam per JVM. This works only if you have 
a single TranquilityBeam for the job.
+  * If you want more singleton beams, you need to manage yourself.
+  *
+  * @param conf The TranquilityBeamConf to be used to index
+  */
+class TranquilitySingletonBeamFactory(conf: TranquilityBeamConf) extends 
TranquilityBeamFactory(conf) {
+  override def makeBeam: Beam[Map[String, Any]] = {
+    TranquilitySingletonBeamFactory.synchronized(
+      if (TranquilitySingletonBeamFactory.BeamInstance.isEmpty) {
+        TranquilitySingletonBeamFactory.BeamInstance = Some(this.createBeam)
+      }
+    )
+    TranquilitySingletonBeamFactory.BeamInstance.get
+  }
+}
+
+object TranquilitySingletonBeamFactory {
+  private var BeamInstance: Option[Beam[Map[String, Any]]] = None
+}

-- 
To view, visit https://gerrit.wikimedia.org/r/373030
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I22b0b641ad2ad2f1c88888332b03373abe3e0d00
Gerrit-PatchSet: 17
Gerrit-Project: analytics/refinery/source
Gerrit-Branch: master
Gerrit-Owner: Joal <[email protected]>
Gerrit-Reviewer: Elukey <[email protected]>
Gerrit-Reviewer: Joal <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to