Mforns has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/337593 )

Change subject: Add spark job to aggregate historical projectviews
......................................................................

Add spark job to aggregate historical projectviews

Bug: T156388
Change-Id: I25f590a48f5bbffc0de1c16c3f5603e6d35d5113
---
M pom.xml
A 
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/HistoricalProjectviews.scala
2 files changed, 187 insertions(+), 3 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source 
refs/changes/93/337593/1

diff --git a/pom.xml b/pom.xml
index 58c5d21..9d4fc37 100644
--- a/pom.xml
+++ b/pom.xml
@@ -305,10 +305,10 @@
       <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
       <skip.tests>false</skip.tests>
       <java.version>1.7</java.version>
-      <hadoop.version>2.6.0-cdh5.5.2</hadoop.version>
-      <hive.version>1.1.0-cdh5.5.2</hive.version>
+      <hadoop.version>2.6.0-cdh5.9.0</hadoop.version>
+      <hive.version>1.1.0-cdh5.9.0</hive.version>
       <scala.version>2.10.4</scala.version>
-      <spark.version>1.5.0-cdh5.5.2</spark.version>
+      <spark.version>1.6.0-cdh5.9.0</spark.version>
       <camus.version>0.1.0-wmf7</camus.version>
     </properties>
 
diff --git 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/HistoricalProjectviews.scala
 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/HistoricalProjectviews.scala
new file mode 100644
index 0000000..9f6b7fb
--- /dev/null
+++ 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/HistoricalProjectviews.scala
@@ -0,0 +1,184 @@
+/**
+ * Historical Projectviews
+ *
+ * Extract and generate historical projectview data from the pagecounts-raw 
data set.
+ * The resulting data set is formatted as compressed TSV and written in a 
single
+ * output directory (no subdirectories).
+ *
+ * Usage with spark-submit (or spark-shell):
+ *
+ *     spark-submit \
+ *         --class org.wikimedia.analytics.refinery.job.HistoricalProjectviews 
\
+ *         /path/to/pagecounts-raw/data/set \
+ *         /path/to/write/generated/projectviews \
+ *         -n 20 \
+ *         -c gzip
+ */
+
+package org.wikimedia.analytics.refinery.job
+
+import org.apache.spark.sql.functions.input_file_name
+import org.apache.spark.sql.types._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.{SparkConf, SparkContext}
+import scopt.OptionParser
+import org.apache.spark.Accumulator
+
+object HistoricalProjectviews {
+
+    // Command line argument object.
+    case class Config(
+        sourceDir: String = 
"hdfs://analytics-hadoop/wmf/data/archive/pagecounts-raw",
+        destinationDir: String = 
"hdfs://analytics-hadoop/wmf/data/archive/projectview/historical",
+        outputFiles: Int = 20,
+        outputCodec: String = "gzip"
+    )
+
+    // Command line parser definition.
+    val argsParser = new OptionParser[Config]("Historical Projectviews") {
+        head("Historical Projectviews", "")
+        note("Generates historical projectviews aggregating from 
pagecounts-raw data set.")
+        arg[String]("<sourceDir>").text("Root directory of the pagecounts-raw 
source data set.")
+        arg[String]("<destinationDir>").text("Root directory where to write 
the output files.")
+        opt[Int]('n', "output-files").text("Number of files to split the 
output into. Default: 20.")
+        opt[String]('c', "output-codec").text("Compression codec for output 
files. Default: gzip.")
+        help("help") text ("Prints this text and exits.")
+    }
+
+    // Helper type to use as key for aggregation.
+    type Group = (Int, Int, Int, Int, String)
+
+    /**
+     * Read the directory tree of the pagecounts-raw data set,
+     * and parse it into an RDD with a format suitable for aggregation.
+     */
+    def parsePageCountsRaw(
+        sourceDir: String,
+        sqlContext: SQLContext,
+        parsingErrors: Accumulator[Long]
+    ): RDD[(Group, Long)] = {
+
+        // Read all files into an RDD with the format: (absoluteFilePath, 
dataLine)
+        import sqlContext.implicits._
+        val rawData = sqlContext.read.text(sourceDir).select(input_file_name, 
$"value").rdd
+
+        // Parse year, month, day and hour from the file name,
+        // and parse wikiCode and viewCount from the line.
+        // The resulting format is: ((year, month, day, hour, wikiCode), 
viewCount)
+        val fileNameRE = 
""".*/(\d{4})/\d{4}-(\d{2})/pagecounts-\d{6}(\d{2})-(\d{2})\d{4}.gz""".r
+        val dataLineRE = """([^\s]+)\s[^\s]+\s([^\s]+)\s[^\s]+""".r
+        rawData.flatMap{ row =>
+            val (filePath, dataLine) = (row.getString(0), row.getString(1))
+            filePath match {
+                case fileNameRE(year, month, day, hour) =>
+                    dataLine match {
+                        case dataLineRE(wikiCode, viewCount) =>
+                            Array((
+                                (year.toInt, month.toInt, day.toInt, 
hour.toInt, wikiCode),
+                                viewCount.toLong
+                            ))
+                        case _ =>
+                            parsingErrors.add(1)
+                            Array.empty[(Group, Long)]
+                    }
+                case _ => throw new Exception("File name can not be parsed: " 
+ filePath)
+            }
+        }
+    }
+
+    /**
+     * Aggregate pagecounts-raw dataset into projectviews
+     * (discard the article title dimension and sum up view counts across 
wikis).
+     */
+    def aggregateProjectviews(parsedData: RDD[(Group, Long)]): RDD[(Group, 
Long)] = {
+        // Pre-aggregate within partitions.
+        // We know that all elements in a partition have the same year, month, 
day and hour,
+        // because they come from the same hourly file. If we pre-aggregate 
them within the
+        // partition we reduce its size considerably from ~1M elements to ~800 
elements.
+        // This reduces the data that needs to be hashed and redistributed 
when aggregating
+        // across the whole data set.
+        val preAggregatedData = parsedData.mapPartitions{ it =>
+            it.foldLeft(Map.empty[Group, Long]){ case (aggregated, (group, 
viewCount)) =>
+                aggregated + (group -> (aggregated.getOrElse(group, 0L) + 
viewCount))
+            }.toIterator
+        }
+
+        // Aggregate (fully) across all partitions.
+        // This function's output format remains the same as the input.
+        preAggregatedData.reduceByKey(_ + _)
+    }
+
+    /**
+     * Write the aggregated projectcounts data in a format recognizable by 
hive.
+     */
+    def writeProjectviews(
+        projectviews: RDD[(Group, Long)],
+        destinationDir: String,
+        outputFiles: Int,
+        outputCodec: String,
+        sqlContext: SQLContext
+    ) = {
+
+        // Create data frame from RDD.
+        val df = sqlContext.createDataFrame(
+            // Format RDD as rows.
+            projectviews.map{
+                case ((year, month, day, hour, wikiCode), viewCount) =>
+                    Row(year, month, day, hour, wikiCode, viewCount)
+            }.coalesce(outputFiles, true),
+            // Specify new data set schema.
+            StructType(Seq(
+                StructField("year", IntegerType, nullable = false),
+                StructField("month", IntegerType, nullable = false),
+                StructField("day", IntegerType, nullable = false),
+                StructField("hour", IntegerType, nullable = false),
+                StructField("wiki_code", StringType, nullable = false),
+                StructField("view_count", LongType, nullable = false)
+            ))
+        )
+        // Write the data to the destination directory.
+        df.write.
+            format("csv").
+            option("delimiter", "\t").
+            option("codec", outputCodec).
+            save(destinationDir)
+    }
+
+    def main(args: Array[String]) {
+        argsParser.parse(args, Config()) match {
+            case Some(params) => {
+                // Initial Spark setup.
+                val conf = new SparkConf().setAppName("AppSessionMetrics")
+                val sc = new SparkContext(conf)
+                val sqlContext = new SQLContext(sc)
+                // sqlContext.setConf("spark.sql.parquet.compression.codec", 
"snappy")
+
+                // Accumulator to keep track of parsing errors.
+                val parsingErrors = sc.accumulator(0L)
+
+                val pagecounts = parsePageCountsRaw(
+                    params.sourceDir,
+                    sqlContext,
+                    parsingErrors
+                )
+
+                val projectviews = aggregateProjectviews(pagecounts)
+
+                writeProjectviews(
+                    projectviews,
+                    params.destinationDir,
+                    params.outputFiles,
+                    params.outputCodec,
+                    sqlContext
+                )
+
+                // Print number of parsing errors for sanity check.
+                println(s"Parsing errors: $parsingErrors")
+            }
+            // Arguments are bad, error message will have been displayed.
+            case None => sys.exit(1)
+        }
+    }
+}

-- 
To view, visit https://gerrit.wikimedia.org/r/337593
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I25f590a48f5bbffc0de1c16c3f5603e6d35d5113
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery/source
Gerrit-Branch: master
Gerrit-Owner: Mforns <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to