Mforns has uploaded a new change for review. (
https://gerrit.wikimedia.org/r/337593 )
Change subject: Add spark job to aggregate historical projectviews
......................................................................
Add spark job to aggregate historical projectviews
Bug: T156388
Change-Id: I25f590a48f5bbffc0de1c16c3f5603e6d35d5113
---
M pom.xml
A
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/HistoricalProjectviews.scala
2 files changed, 187 insertions(+), 3 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source
refs/changes/93/337593/1
diff --git a/pom.xml b/pom.xml
index 58c5d21..9d4fc37 100644
--- a/pom.xml
+++ b/pom.xml
@@ -305,10 +305,10 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<skip.tests>false</skip.tests>
<java.version>1.7</java.version>
- <hadoop.version>2.6.0-cdh5.5.2</hadoop.version>
- <hive.version>1.1.0-cdh5.5.2</hive.version>
+ <hadoop.version>2.6.0-cdh5.9.0</hadoop.version>
+ <hive.version>1.1.0-cdh5.9.0</hive.version>
<scala.version>2.10.4</scala.version>
- <spark.version>1.5.0-cdh5.5.2</spark.version>
+ <spark.version>1.6.0-cdh5.9.0</spark.version>
<camus.version>0.1.0-wmf7</camus.version>
</properties>
diff --git
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/HistoricalProjectviews.scala
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/HistoricalProjectviews.scala
new file mode 100644
index 0000000..9f6b7fb
--- /dev/null
+++
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/HistoricalProjectviews.scala
@@ -0,0 +1,184 @@
+/**
+ * Historical Projectviews
+ *
+ * Extract and generate historical projectview data from the pagecounts-raw
data set.
+ * The resulting data set is formatted as compressed TSV and written in a
single
+ * output directory (no subdirectories).
+ *
+ * Usage with spark-submit (or spark-shell):
+ *
+ * spark-submit \
+ * --class org.wikimedia.analytics.refinery.job.HistoricalProjectviews
\
+ * /path/to/pagecounts-raw/data/set \
+ * /path/to/write/generated/projectviews \
+ * -n 20 \
+ * -c gzip
+ */
+
+package org.wikimedia.analytics.refinery.job
+
+import org.apache.spark.sql.functions.input_file_name
+import org.apache.spark.sql.types._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.{SparkConf, SparkContext}
+import scopt.OptionParser
+import org.apache.spark.Accumulator
+
+object HistoricalProjectviews {
+
+ // Command line argument object.
+ case class Config(
+ sourceDir: String =
"hdfs://analytics-hadoop/wmf/data/archive/pagecounts-raw",
+ destinationDir: String =
"hdfs://analytics-hadoop/wmf/data/archive/projectview/historical",
+ outputFiles: Int = 20,
+ outputCodec: String = "gzip"
+ )
+
+ // Command line parser definition.
+ val argsParser = new OptionParser[Config]("Historical Projectviews") {
+ head("Historical Projectviews", "")
+ note("Generates historical projectviews aggregating from
pagecounts-raw data set.")
+ arg[String]("<sourceDir>").text("Root directory of the pagecounts-raw
source data set.")
+ arg[String]("<destinationDir>").text("Root directory where to write
the output files.")
+ opt[Int]('n', "output-files").text("Number of files to split the
output into. Default: 20.")
+ opt[String]('c', "output-codec").text("Compression codec for output
files. Default: gzip.")
+ help("help") text ("Prints this text and exits.")
+ }
+
+ // Helper type to use as key for aggregation.
+ type Group = (Int, Int, Int, Int, String)
+
+ /**
+ * Read the directory tree of the pagecounts-raw data set,
+ * and parse it into an RDD with a format suitable for aggregation.
+ */
+ def parsePageCountsRaw(
+ sourceDir: String,
+ sqlContext: SQLContext,
+ parsingErrors: Accumulator[Long]
+ ): RDD[(Group, Long)] = {
+
+ // Read all files into an RDD with the format: (absoluteFilePath,
dataLine)
+ import sqlContext.implicits._
+ val rawData = sqlContext.read.text(sourceDir).select(input_file_name,
$"value").rdd
+
+ // Parse year, month, day and hour from the file name,
+ // and parse wikiCode and viewCount from the line.
+ // The resulting format is: ((year, month, day, hour, wikiCode),
viewCount)
+ val fileNameRE =
""".*/(\d{4})/\d{4}-(\d{2})/pagecounts-\d{6}(\d{2})-(\d{2})\d{4}.gz""".r
+ val dataLineRE = """([^\s]+)\s[^\s]+\s([^\s]+)\s[^\s]+""".r
+ rawData.flatMap{ row =>
+ val (filePath, dataLine) = (row.getString(0), row.getString(1))
+ filePath match {
+ case fileNameRE(year, month, day, hour) =>
+ dataLine match {
+ case dataLineRE(wikiCode, viewCount) =>
+ Array((
+ (year.toInt, month.toInt, day.toInt,
hour.toInt, wikiCode),
+ viewCount.toLong
+ ))
+ case _ =>
+ parsingErrors.add(1)
+ Array.empty[(Group, Long)]
+ }
+ case _ => throw new Exception("File name can not be parsed: "
+ filePath)
+ }
+ }
+ }
+
+ /**
+ * Aggregate pagecounts-raw dataset into projectviews
+ * (discard the article title dimension and sum up view counts across
wikis).
+ */
+ def aggregateProjectviews(parsedData: RDD[(Group, Long)]): RDD[(Group,
Long)] = {
+ // Pre-aggregate within partitions.
+ // We know that all elements in a partition have the same year, month,
day and hour,
+ // because they come from the same hourly file. If we pre-aggregate
them within the
+ // partition we reduce its size considerably from ~1M elements to ~800
elements.
+ // This reduces the data that needs to be hashed and redistributed
when aggregating
+ // across the whole data set.
+ val preAggregatedData = parsedData.mapPartitions{ it =>
+ it.foldLeft(Map.empty[Group, Long]){ case (aggregated, (group,
viewCount)) =>
+ aggregated + (group -> (aggregated.getOrElse(group, 0L) +
viewCount))
+ }.toIterator
+ }
+
+ // Aggregate (fully) across all partitions.
+ // This function's output format remains the same as the input.
+ preAggregatedData.reduceByKey(_ + _)
+ }
+
+ /**
+ * Write the aggregated projectcounts data in a format recognizable by
hive.
+ */
+ def writeProjectviews(
+ projectviews: RDD[(Group, Long)],
+ destinationDir: String,
+ outputFiles: Int,
+ outputCodec: String,
+ sqlContext: SQLContext
+ ) = {
+
+ // Create data frame from RDD.
+ val df = sqlContext.createDataFrame(
+ // Format RDD as rows.
+ projectviews.map{
+ case ((year, month, day, hour, wikiCode), viewCount) =>
+ Row(year, month, day, hour, wikiCode, viewCount)
+ }.coalesce(outputFiles, true),
+ // Specify new data set schema.
+ StructType(Seq(
+ StructField("year", IntegerType, nullable = false),
+ StructField("month", IntegerType, nullable = false),
+ StructField("day", IntegerType, nullable = false),
+ StructField("hour", IntegerType, nullable = false),
+ StructField("wiki_code", StringType, nullable = false),
+ StructField("view_count", LongType, nullable = false)
+ ))
+ )
+ // Write the data to the destination directory.
+ df.write.
+ format("csv").
+ option("delimiter", "\t").
+ option("codec", outputCodec).
+ save(destinationDir)
+ }
+
+ def main(args: Array[String]) {
+ argsParser.parse(args, Config()) match {
+ case Some(params) => {
+ // Initial Spark setup.
+ val conf = new SparkConf().setAppName("AppSessionMetrics")
+ val sc = new SparkContext(conf)
+ val sqlContext = new SQLContext(sc)
+ // sqlContext.setConf("spark.sql.parquet.compression.codec",
"snappy")
+
+ // Accumulator to keep track of parsing errors.
+ val parsingErrors = sc.accumulator(0L)
+
+ val pagecounts = parsePageCountsRaw(
+ params.sourceDir,
+ sqlContext,
+ parsingErrors
+ )
+
+ val projectviews = aggregateProjectviews(pagecounts)
+
+ writeProjectviews(
+ projectviews,
+ params.destinationDir,
+ params.outputFiles,
+ params.outputCodec,
+ sqlContext
+ )
+
+ // Print number of parsing errors for sanity check.
+ println(s"Parsing errors: $parsingErrors")
+ }
+ // Arguments are bad, error message will have been displayed.
+ case None => sys.exit(1)
+ }
+ }
+}
--
To view, visit https://gerrit.wikimedia.org/r/337593
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I25f590a48f5bbffc0de1c16c3f5603e6d35d5113
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery/source
Gerrit-Branch: master
Gerrit-Owner: Mforns <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits