jenkins-bot has submitted this change and it was merged. (
https://gerrit.wikimedia.org/r/405285 )
Change subject: Clean refinery-job from BannerImpressionStream job
......................................................................
Clean refinery-job from BannerImpressionStream job
The BannerImpressionStream job now leaves in the temporary module
refinery-job-spark-2.1 as it depends on spark-2.
Change-Id: Ic859e84b43e6c5dba66987432af048ac38c81655
---
M refinery-job/pom.xml
D
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/BannerImpressionsStream.scala
2 files changed, 1 insertion(+), 205 deletions(-)
Approvals:
Ottomata: Looks good to me, but someone else must approve
Joal: Looks good to me, approved
jenkins-bot: Verified
diff --git a/refinery-job/pom.xml b/refinery-job/pom.xml
index fd9a438..4091d23 100644
--- a/refinery-job/pom.xml
+++ b/refinery-job/pom.xml
@@ -64,25 +64,6 @@
</dependency>
<dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-kafka_2.10</artifactId>
- <version>${spark.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_2.10</artifactId>
- <version>${spark.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>com.netaporter</groupId>
- <artifactId>scala-uri_2.10</artifactId>
- <version>0.4.16</version>
- </dependency>
-
- <dependency>
<groupId>com.twitter</groupId>
<artifactId>algebird-core_2.10</artifactId>
<version>0.10.2</version>
@@ -141,8 +122,7 @@
<artifactId>graphframes</artifactId>
<version>0.3.0-spark1.6-s_2.10</version>
</dependency>
-
-
+
<dependency>
<groupId>com.holdenkarau</groupId>
<artifactId>spark-testing-base_2.10</artifactId>
diff --git
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/BannerImpressionsStream.scala
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/BannerImpressionsStream.scala
deleted file mode 100644
index 6086d3c..0000000
---
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/BannerImpressionsStream.scala
+++ /dev/null
@@ -1,184 +0,0 @@
-package org.wikimedia.analytics.refinery.job
-
-import com.netaporter.uri.Uri
-import kafka.serializer.StringDecoder
-import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer,
ProducerConfig}
-import org.apache.spark.streaming.kafka.KafkaUtils
-import org.apache.spark.{SparkContext, SparkConf}
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.wikimedia.analytics.refinery.core.Webrequest
-import scopt.OptionParser
-import org.json4s._
-import org.json4s.jackson.JsonMethods._
-import org.json4s.JsonDSL._
-import scala.collection.JavaConverters._
-
-
-object BannerImpressionsStream {
-
- def run(
- @transient sc: SparkContext,
- kafkaBrokers:String,
- kafkaInputTopics: String,
- kafkaOutputTopic: String,
- batchDurationSeconds: Int,
- checkpointDirectory: String,
- noCheckpoint: Boolean
- ): Unit = {
-
- def newStreamingContext() = {
- val ssc = new StreamingContext(sc, Seconds(batchDurationSeconds.toLong))
- ssc.checkpoint(checkpointDirectory)
-
- val kafkaInputTopicsSet = kafkaInputTopics.split(",").toSet
- val KafkaInputParameters = Map[String, String]("metadata.broker.list" ->
kafkaBrokers)
-
- // Get kafka batches from input topics
- val messageStream = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
- ssc,
- KafkaInputParameters,
- kafkaInputTopicsSet
- )
-
- // Compute banner oriented filtering / conversion / aggregation
- val bannerStream = messageStream.
- // Extract the JSON message from the Kafka (Key, Value) message.
- map { case (_, str) => parse(str) }.
- filter(json => {
- (json \ "uri_path").values.asInstanceOf[String] ==
"/beacon/impression" &&
- (json \
"uri_query").values.asInstanceOf[String].contains("debug=false") &&
- !Webrequest.getInstance().isSpider((json \
"user_agent").values.asInstanceOf[String])
- }).
- map(json => {
- val uri_query = (json \\ "uri_query").values.asInstanceOf[String]
- val uri: Uri = Uri.parse("http://bla.org/woo/" + uri_query)
- val minuteTs = (json \
"dt").values.asInstanceOf[String].replaceAll(":\\d\\d$", ":00")
- val params: Map[String, Seq[String]] = uri.query.paramMap
-
- ("dt" -> minuteTs) ~
- ("campaign" -> params.getOrElse("campaign",
List.empty[String]).headOption) ~
- ("banner" -> params.getOrElse("banner",
List.empty[String]).headOption) ~
- ("project" -> params.getOrElse("project",
List.empty[String]).headOption) ~
- ("uselang" -> params.getOrElse("uselang",
List.empty[String]).headOption) ~
- ("bucket" -> params.getOrElse("bucket",
List.empty[String]).headOption) ~
- ("anonymous" -> (params.getOrElse("anonymous",
List.empty[String]).headOption == Some("true"))) ~
- ("status_code" -> params.getOrElse("statusCode",
List.empty[String]).headOption) ~
- ("country" -> params.getOrElse("country",
List.empty[String]).headOption) ~
- ("device" -> params.getOrElse("device",
List.empty[String]).headOption) ~
- ("sample_rate" -> params.getOrElse("recordImpressionSampleRate",
List.empty[String]).headOption.map(_.toDouble))
- }).
- countByValue().
- map { case (json, count) =>
- val jsonSampleRate = json \ "sample_rate"
- json merge (
- ("request_count" -> count) ~
- ("normalized_request_count" -> {
- if (jsonSampleRate != JNothing) Some(count /
jsonSampleRate.values.asInstanceOf[Double])
- else None
- })
- )
- }.
- map(j => compact(render(j)))
-
- // Output banners data back to kafka
- bannerStream.foreachRDD(rdd => {
- System.out.println("# events = " + rdd.count())
-
- rdd.foreachPartition(partition => {
- // Print statements in this section are shown in the executor's
stdout logs
- val props = Map[String, String](
- ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> kafkaBrokers,
- ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG ->
"org.apache.kafka.common.serialization.StringSerializer",
- ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG ->
"org.apache.kafka.common.serialization.StringSerializer"
- ).asInstanceOf[Map[String, Object]]
-
- val producer = new KafkaProducer[String, String](props.asJava)
- partition.foreach(record => {
- val data = record.toString
- val message = new ProducerRecord[String, String](kafkaOutputTopic,
null, data)
- producer.send(message)
- })
- producer.close()
- })
- })
- ssc
- }
-
- val context = {
- if (noCheckpoint) newStreamingContext()
- else StreamingContext.getOrCreate(checkpointDirectory,
newStreamingContext)
- }
-
- // Start the context
- context.start()
- context.awaitTermination()
- }
-
- /**
- * Config class for CLI argument parser using scopt
- */
- case class Params(
- kafkaBrokers: String = Seq("12", "13", "14", "18", "20",
"22").map("kafka10" + _ + ".eqiad.wmnet:9092").mkString(","),
- kafkaInputTopics: String = "webrequest_text",
- kafkaOutputTopic: String = "test_banner_impressions_joal",
- batchDurationSecs: Int = 10,
- checkpointDirectory: String =
"hdfs://analytics-hadoop/tmp/spark/banner_impressions_stream_checkpoint",
- noCheckpoint: Boolean = false
- )
-
- /**
- * Define the command line options parser
- */
- val argsParser = new OptionParser[Params]("Banner Impressions Stream") {
- head("Banner Impressions Stream", "")
- note( "Extract banner impressions data from kafka webrequest stream and
write it back to kafka")
- help("help") text "Prints this usage text"
-
- opt[String]('k', "kafka-brokers") optional() valueName
"<broker1,...,brokerN>" action {
- (x, p) => p.copy(kafkaBrokers = x)
- } text "Kafka brokers to consume from. Defaults to
kafka10[12|14|18|20|22].eqiad.wmnet:9092"
-
- opt[String]('i', "kafka-input-topics") optional() valueName
"<topic1,...,topicK>" action {
- (x, p) => p.copy(kafkaInputTopics = x)
- } text "Input topics to consume. Defaults to webrequest_text"
-
- opt[String]('o', "kafka-output-topic") optional() valueName "<topic>"
action {
- (x, p) => p.copy(kafkaOutputTopic = x)
- } text "Output topic to write to. Defaults to test_banner_impressions_joal"
-
- opt[Int]("batch-duration-seconds") optional() action {
- (x, p) => p.copy(batchDurationSecs = x)
- } text "Batch duration in seconds. Defaults to 10."
-
- opt[String]("checkpoint-dir") optional() valueName "<path>" action {
- (x, p) => p.copy(checkpointDirectory = if (x.endsWith("/")) x else x +
"/")
- } text ("Temporary directory for check-pointing streaming job.\n\t" +
- "Defaults to
hdfs://analytics-hadoop/tmp/spark/banner_impressions_stream_checkpoint")
-
- opt[Unit]("no-checkpoint") optional() action {
- (_, p) => p.copy(noCheckpoint = true)
- } text "Force NOT using checkpoint if exists (wipes existing checkpoint
directory if any)."
-
- }
-
- def main(args: Array[String]) {
- argsParser.parse(args, Params()) match {
-
- case Some(params) =>
- // Initial setup - Spark, SQLContext
- val conf = new SparkConf().setAppName("BannerImpressionsStream")
- val sc = new SparkContext(conf)
- run(
- sc,
- params.kafkaBrokers,
- params.kafkaInputTopics,
- params.kafkaOutputTopic,
- params.batchDurationSecs,
- params.checkpointDirectory,
- params.noCheckpoint
- )
-
- case None => sys.exit(1)
- }
- }
-}
--
To view, visit https://gerrit.wikimedia.org/r/405285
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: Ic859e84b43e6c5dba66987432af048ac38c81655
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery/source
Gerrit-Branch: master
Gerrit-Owner: Joal <[email protected]>
Gerrit-Reviewer: Elukey <[email protected]>
Gerrit-Reviewer: Joal <[email protected]>
Gerrit-Reviewer: Nuria <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>
Gerrit-Reviewer: jenkins-bot <>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits