Shilad Sen has uploaded a new change for review. (
https://gerrit.wikimedia.org/r/381169 )
Change subject: WIP: Spark job to create page ids viewed in each session
......................................................................
WIP: Spark job to create page ids viewed in each session
This job will create a summary of viewer sessions, with one line per
user, and each line consisting of a wiki project code, timestamp, and
all the page ids viewed in the session.
The job now runs on the cluster in a reasonable amount of time (22 min
for a day's worth of views).
I have written a testing harness, but not quite automated it yet. It is
failing and I have not started debugging yet, but this shouldn't be too
tricky.
TODO:
* Polish and debug tests
* Oozify job (may require switching to Spark 2)
Bug: T174796
Change-Id: I55395459d80d73f3d065967ce95d6506698d128e
---
A
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/PartitionQueryBuilder.scala
A
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
A
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestDataCreator.scala
A
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestParams.scala
A
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestPartitionQueryBuilder.scala
A
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPagesBuilder.scala
6 files changed, 1,064 insertions(+), 0 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source
refs/changes/69/381169/1
diff --git
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/PartitionQueryBuilder.scala
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/PartitionQueryBuilder.scala
new file mode 100644
index 0000000..cd43cea
--- /dev/null
+++
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/PartitionQueryBuilder.scala
@@ -0,0 +1,96 @@
+package org.wikimedia.analytics.refinery.job.vectors
+
+import java.sql.Timestamp
+
+import org.joda.time.{DateTime, DateTimeConstants, Hours, Interval}
+
+/**
+ * @author Shilad Sen
+ */
+object PartitionQueryBuilder {
+
+ /**
+ * Creations a SQL predicate that includes the time period from beginTs to
endTs
+ *
+ * The SQL predicate is somewhat compressed, but there is still room for
improvement.
+ *
+ * @return "(year = 2016) or (year = 2017 and month = 1 and day = 1 and
hour < 3)"
+ */
+ def formSql(beginTs: Timestamp, endTs: Timestamp) : String = {
+ val begin = new DateTime(beginTs.getTime).hourOfDay().roundFloorCopy()
+ val end = new DateTime(endTs.getTime).hourOfDay().roundCeilingCopy()
+ if (begin == end) {
+ s"(year = ${begin.getYear} " +
+ s"and month = ${begin.getMonthOfYear} " +
+ s"and day = ${begin.getDayOfMonth} " +
+ s"and hour = ${begin.getHourOfDay})"
+ } else {
+ formSqlConditions(begin, end).map("(" + _ + ")").mkString(" OR ")
+ }
+ }
+
+ def formSqlConditions(begin: DateTime, end: DateTime) : Seq[String] = {
+ if (begin == end) {
+ return List()
+ }
+
+ // Try to take a year out of the middle
+ var startYear = begin.year().roundCeilingCopy()
+ var endYear = startYear.plusYears(1)
+ if (!startYear.isBefore(begin) && !endYear.isAfter(end)) {
+ return formSqlConditions(begin, startYear) ++
+ List(s"year = ${startYear.getYear}") ++
+ formSqlConditions(endYear, end)
+ }
+
+ // Try to take a month out of the middle
+ var startMonth = begin.monthOfYear().roundCeilingCopy()
+ var endMonth = startMonth.plusMonths(1)
+ if (!startMonth.isBefore(begin) && !endMonth.isAfter(end)) {
+ return formSqlConditions(begin, startMonth) ++
+ List(s"year = ${startMonth.getYear} " +
+ s"and month = ${startMonth.getMonthOfYear}") ++
+ formSqlConditions(endMonth, end)
+ }
+
+ // Try to take a day out of the middle
+ var startDay = begin.dayOfMonth().roundCeilingCopy()
+ var endDay = startDay.plusDays(1)
+ if (!startDay.isBefore(begin) && !endDay.isAfter(end)) {
+ return formSqlConditions(begin, startDay) ++
+ List(s"year = ${startDay.getYear} " +
+ s"and month = ${startDay.getMonthOfYear} " +
+ s"and day = ${startDay.getDayOfMonth}") ++
+ formSqlConditions(endDay, end)
+ }
+
+ // Do we have a collection of hours that run up to the end of the starting
day?
+ var startOfNextDay = begin.withTimeAtStartOfDay().plusDays(1)
+ if (!startOfNextDay.isAfter(end)) {
+ return List(s"year = ${begin.getYear} " +
+ s"and month = ${begin.getMonthOfYear} " +
+ s"and day = ${begin.getDayOfMonth} " +
+ s"and hour >= ${begin.getHourOfDay}") ++
+ formSqlConditions(startOfNextDay, end)
+ }
+
+ // Do we have a collection of hours that start at the beginning of the
last day?
+ var startOfLastDay = end.withTimeAtStartOfDay()
+ if (!startOfLastDay.isBefore(begin)) {
+ return formSqlConditions(begin, startOfLastDay) ++
+ List(s"year = ${end.getYear} " +
+ s"and month = ${end.getMonthOfYear} " +
+ s"and day = ${end.getDayOfMonth} " +
+ s"and hour <= ${end.getHourOfDay}")
+ }
+
+ // We must have an hour range within the same day.
+ assert(begin.withTimeAtStartOfDay() == end.withTimeAtStartOfDay())
+ List(s"year = ${begin.getYear} " +
+ s" and month = ${begin.getMonthOfYear}" +
+ s" and day = ${begin.getDayOfMonth}" +
+ s" and hour >= ${begin.getHourOfDay}" +
+ s" and hour <= ${end.getHourOfDay}")
+
+ }
+}
diff --git
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
new file mode 100644
index 0000000..1d03705
--- /dev/null
+++
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
@@ -0,0 +1,605 @@
+package org.wikimedia.analytics.refinery.job.vectors
+
+import java.sql.Timestamp
+
+import org.joda.time.Hours
+import org.joda.time.format.{DateTimeFormat, DateTimeFormatter}
+
+import scala.util.Try
+
+
+/**
+ * Process to create a page view session log.
+ *
+ * This create output files with one line per session.
+ * The first two tokens of each line are the session start timestamp and
language edition.
+ * The following tokens are the page ids viewed in the session, in order.
+ **
+ *
+ * @author Shilad Sen
+ */
+object SessionPagesBuilder {
+
+ import org.apache.spark.sql.hive.HiveContext
+ import org.apache.spark.SparkConf
+ import org.apache.spark.sql.SaveMode
+ import org.apache.spark.SparkContext
+ import scopt.OptionParser
+ import org.apache.spark.rdd.RDD
+ import org.apache.spark.sql.SQLContext
+ import org.joda.time.DateTime
+ import org.apache.log4j.{Level, LogManager}
+ import org.apache.spark.Partitioner
+ import java.sql.Timestamp
+
+ import scala.collection.mutable.ArrayBuffer
+ import scala.util.hashing.MurmurHash3
+
+ /**
+ * Number of seconds of later data to gather beyond the
+ * time interval that is requested in the params to try to
+ * look for the end of sessions that spill over the interval.
+ * These will be included
+ */
+ val LOOK_FUTURE_SECS: Int = 60 * 60 * 3
+
+ /**
+ * How far in the past to look for sessions with start times BEFORE
+ * the requested interval that spill into the requested interval.
+ * These will be excluded from the output.
+ */
+ val LOOK_PAST_SECS: Int = 60 * 59
+
+
+ case class SessionKey (wikiDb: String, userHash: Long, tstamp: Long)
+ extends Ordered[SessionKey] {
+ override def compare(that: SessionKey): Int = {
+ import scala.math.Ordered.orderingToOrdered
+
+ Ordering.Tuple3(Ordering.String, Ordering.Long, Ordering.Long).compare(
+ (wikiDb, userHash, tstamp),
+ (that.wikiDb, that.userHash, that.tstamp)
+ )
+ }
+ }
+
+
+ class SessionPartitioner(partitions: Int) extends Partitioner {
+ require(partitions >= 0, s"Number of partitions ($partitions) cannot be
negative.")
+
+ override def numPartitions: Int = partitions
+
+ override def getPartition(key: Any): Int = {
+ val k = key.asInstanceOf[SessionKey]
+ Math.abs(k.wikiDb.hashCode() + k.userHash.asInstanceOf[Int]) %
numPartitions
+ }
+ }
+
+ object SessionKey {
+ implicit def orderingByTimestamp[A <: SessionKey] : Ordering[A] = {
+ Ordering.by(fk => (fk.wikiDb, fk.userHash, fk.tstamp))
+ }
+ }
+
+ def getTimestamp(ts: Any) : Timestamp = {
+ ts match {
+ case _: Timestamp =>
+ ts.asInstanceOf[Timestamp]
+ case _: String =>
+ Timestamp.valueOf(ts.asInstanceOf[String])
+ case _ => throw new IllegalArgumentException(ts.toString)
+ }
+ }
+
+ /**
+ * A single page.
+ * Effective page id is the page id after resolving redirects.
+ */
+ case class PageInfo(
+ wikiDb: String,
+ pageId: Long,
+ pageTitle: String,
+ pageNamespace: Long,
+ pageIsRedirect: Boolean,
+ effectivePageId : Long
+ )
+
+ /**
+ * A view of a page by a user.
+ * tstamp is milliseconds since the epoch.
+ * userHash is a 64-bit hash of user ip and other unique information.
+ */
+ case class PageView(
+ wikiDb: String,
+ userHash: Long,
+ tstamp: Long,
+ pageNamespace: Long,
+ pageId: Long
+ )
+
+ case class Redirect(
+ wikiDb: String,
+ fromPageId: Long,
+ toPageId: Long
+ )
+
+
+ /**
+ * Returns true iff the two view are part of the same session.
+ * User hashes must match and the pageviews must fall within a specified
window.
+ */
+ def sameSession(v1: PageView, v2: PageView, timeoutMillis: Long) : Boolean =
{
+ (v1.wikiDb == v2.wikiDb) &&
+ (v1.userHash == v2.userHash) &&
+ (Math.abs(v1.tstamp - v2.tstamp) <= timeoutMillis)
+ }
+
+
+ /**
+ * Given components associated with a distinct user , return a long hash
+ * @param client_ip
+ * @param user_agent
+ * @param x_forwarded_for
+ * @return
+ */
+ def userHash(
+ client_ip: String,
+ user_agent: String,
+ x_forwarded_for: String
+ ) : Long = {
+ var s = client_ip + ":" + user_agent + ":" + x_forwarded_for
+ (MurmurHash3.stringHash(s).asInstanceOf[Long] << 32) |
+ (MurmurHash3.stringHash(s.reverse).asInstanceOf[Long] & 0xFFFFFFFL)
+ }
+
+ def listToSQLInCondition(list: Seq[String]): String = list.map(w =>
s"'$w'").mkString(", ")
+
+ /**
+ * Prepare a map linking project hostnames to project dbnames.
+ *
+ * @return A map[hostname -> dbname]
+ */
+ def prepareProjectToWikiMap(
+ sqlContext: SQLContext,
+ projectNamespaceTable: String,
+ snapshot: String,
+ wikiList: Seq[String]
+ ): Map[String, String] = {
+ sqlContext.sql(
+ s"""
+ |SELECT DISTINCT
+ | hostname,
+ | dbname
+ |FROM $projectNamespaceTable
+ |WHERE snapshot = '$snapshot'
+ | AND dbname IN (${listToSQLInCondition(wikiList)})
+ """.stripMargin).
+ rdd.
+ collect.
+ map(r => r.getString(0) -> r.getString(1)).toMap
+ }
+
+ def hoursInRange(begin: org.joda.time.DateTime, end: org.joda.time.DateTime)
: Seq[(Int, Int, Int, Int)]= {
+ val beginHour = begin.hourOfDay().roundFloorCopy()
+ val endHour = end.hourOfDay().roundCeilingCopy()
+ val numHours = Hours.hoursBetween(beginHour, endHour).getHours
+ val dates = (0 to numHours).map(beginHour.plusHours)
+ dates.map(d => (d.getYear, d.getMonthOfYear, d.getDayOfMonth,
d.getHourOfDay))
+ }
+
+ /**
+ * Prepare the pages dataset to be reused
+ *
+ * @return A RDD of PageInfo data augmented with fake pages for each wiki
+ */
+ def preparePages(
+ sqlContext: SQLContext,
+ pageTable: String,
+ snapshot: String,
+ wikiList: Seq[String]
+ ): RDD[PageInfo] = {
+ sqlContext.sql(
+ s"""
+ |SELECT
+ | wiki_db,
+ | page_id,
+ | page_title,
+ | page_namespace,
+ | page_is_redirect
+ |FROM $pageTable
+ |WHERE snapshot = '$snapshot'
+ | AND wiki_db IN (${listToSQLInCondition(wikiList)})
+ | AND page_id IS NOT NULL
+ | AND page_namespace = 0
+ | AND page_id > 0
+ |GROUP BY
+ | wiki_db,
+ | page_id,
+ | page_title,
+ | page_namespace,
+ | page_is_redirect
+ """.stripMargin).rdd.
+ map(r => {
+ PageInfo(r.getString(0), r.getLong(1), r.getString(2),
+ r.getLong(3), r.getBoolean(4), r.getLong(1))
+ })
+ }
+
+ /**
+ * Prepare the redirects dataset to be reused
+ *
+ * @return A RDD of redirect data
+ */
+ def prepareRedirects(
+ sqlContext: SQLContext,
+ redirectTable: String,
+ snapshot: String,
+ wikiList: Seq[String],
+ pages: RDD[PageInfo]
+ ): RDD[Redirect] = {
+
+ val pagesPerPageId = pages.map(p => ((p.wikiDb, p.pageId),
p.pageTitle)).cache()
+ val pagesPerTitleAndNamespace = pages.map(p => ((p.wikiDb, p.pageTitle,
p.pageNamespace), p.pageId)).cache()
+
+ sqlContext.sql(
+ s"""
+ |SELECT
+ | wiki_db,
+ | rd_from,
+ | rd_title,
+ | rd_namespace
+ |FROM $redirectTable
+ |WHERE snapshot = '$snapshot'
+ | AND wiki_db IN (${listToSQLInCondition(wikiList)})
+ | AND rd_from IS NOT NULL
+ | AND rd_from > 0
+ |GROUP BY
+ | wiki_db,
+ | rd_from,
+ | rd_title,
+ | rd_namespace
+ """.stripMargin)
+ .rdd
+ .map(r => {
+ ((r.getString(0), r.getLong(1)), (r.getString(2), r.getLong(3)))
+ })
+ .filter(t => Option(t._2._1).isDefined) // Remove null toPageTitle
+ .join(pagesPerPageId) // Ensure fromPageId exists
in page table
+ .map { case ((wiki, fromPageId), ((toPageTitle, toPageNamespace), _))
=>
+ ((wiki, toPageTitle, toPageNamespace), fromPageId) }
+ .join(pagesPerTitleAndNamespace) // Get toPageId from page
table
+ .map { case ((wiki, _ , _), (fromPageId, toPageId)) =>
+ Redirect(wiki, fromPageId, toPageId) }
+ .distinct // prevent any duplicate (data corruption)
+ }
+
+ /**
+ * Replace page ids for pages that are redirected.
+ *
+ * @return PageInfo with redirects applied to relevant page ids
+ */
+ def resolveRedirects(
+ sqlContext: SQLContext,
+ pages: RDD[PageInfo],
+ redirects: RDD[Redirect]
+ ) : RDD[PageInfo] = {
+
+ var redirectsBySrc = redirects.map(r => ((r.wikiDb, r.fromPageId),
r.toPageId)).cache()
+
+ // This function maps redirects once
+ val processRedirects = (pages: RDD[PageInfo]) => {
+ pages.map( p => ((p.wikiDb, p.effectivePageId),
p) )
+ .leftOuterJoin(redirectsBySrc)
+ .map { case (_, (p, redirectId)) =>
+ PageInfo(p.wikiDb,
+ p.pageId,
+ p.pageTitle, p.pageNamespace,
+ p.pageIsRedirect,
+
redirectId.getOrElse(p.effectivePageId))
+ }
+ } : RDD[PageInfo]
+
+ // We only process two-levels of redirects. Is this okay?
+ processRedirects(
+ processRedirects(pages))
+ }
+
+
+ /**
+ * Prepare the view data
+ * @return A RDD of page views
+ */
+ def prepareViews(
+ sqlContext: SQLContext,
+ webrequestTable: String,
+ begin: Timestamp,
+ end: Timestamp,
+ projectList: Seq[String],
+ projectToWikiMap: Map[String, String],
+ pages: RDD[PageInfo]
+ ): RDD[PageView] = {
+ val pagesPerTitles = pages
+ .map(p => ((p.wikiDb, p.pageTitle), (p.pageNamespace, p.pageId))) //
((wiki, pageTitle), (pageNamespace, pageId))
+ .cache()
+
+ sqlContext.sql(
+ s"""
+ |SELECT
+ | CONCAT(pageview_info['project'], '.org') AS project,
+ | pageview_info['page_title'] as title,
+ | ts,
+ | client_ip,
+ | user_agent,
+ | x_forwarded_for
+ |FROM $webrequestTable
+ |WHERE webrequest_source = 'text'
+ | AND(${PartitionQueryBuilder.formSql(begin, end)})
+ | AND pageview_info['project'] IN
(${listToSQLInCondition(projectList)})
+ | AND is_pageview
+ | AND agent_type = 'user'
+ """.stripMargin)
+ .rdd
+ .map(r => {
+ val wiki = projectToWikiMap(r.getString(0))
+ val uh = userHash(r.getString(3), r.getString(4),
r.getString(5))
+ ((wiki, r.getString(1)), (getTimestamp(r.get(2)), uh))
+ })
+ .join(pagesPerTitles)
+ .map { case ((wiki, title), ((ts, userHash), (pageNs, pageId))) =>
+ PageView(wiki, userHash, ts.getTime, pageNs, pageId)
+ }
+ }
+
+ /**
+ * Aggregates page views by user / wiki and groups them into sessions.
+ *
+ * 1. Create session key composed of wiki, userhash, tstamp that delineates
sessions.
+ * 2. Partition by wiki, userhash with secondary sort on tstamp.
+ * 3. Reduce page views into sessions on each partitition. The iterator
+ * is necessary for the final step because there could be millions /
billions
+ * of page views on a partition so we cannot load the result set in
memory.
+ *
+ * @return
+ */
+ def createSessions(views: RDD[PageView], timeoutInMillis: Int):
RDD[Seq[PageView]] = {
+ views
+ .map { v => (SessionKey(v.wikiDb, v.userHash, v.tstamp), v) }
+ .repartitionAndSortWithinPartitions(new
SessionPartitioner(views.getNumPartitions))
+ .mapPartitions {
+ iter =>
+ new Iterator[Seq[PageView]] {
+ var eos: Boolean = !iter.hasNext
+ var session = new ArrayBuffer[PageView]()
+ var nextSessionHead : Option[PageView] = None
+
+ override def hasNext: Boolean = {
+ tryToFillSession()
+ session.nonEmpty
+ }
+
+ override def next: Seq[PageView] = {
+ tryToFillSession()
+ assert(session.nonEmpty)
+
+ // Create a new empty session and place the next session head
into it
+ var s = session
+ session = new ArrayBuffer[PageView]()
+ if (nextSessionHead.isDefined) {
+ session += nextSessionHead.get
+ nextSessionHead = None
+ }
+ s
+ }
+
+ /**
+ * Recursively adds the next page view onto the current session
until
+ * we are certain it belongs to the next session. The sign this
has occurred
+ * is nextSessionHead is non-empty
+ */
+ def tryToFillSession(): Unit = {
+ if (!eos && nextSessionHead.isEmpty && iter.hasNext) {
+ var p = iter.next._2
+ if (session.isEmpty || sameSession(p, session.last,
timeoutInMillis)) {
+ session += p
+ tryToFillSession()
+ } else {
+ nextSessionHead = Some(p)
+ }
+ }
+ }
+ }
+ }
+ }
+
+
+ /**
+ * Write sessions to output files.
+ * TODO: Overwrite existing files.
+ */
+ def writeSessions(sqlContext: SQLContext, sessions: RDD[Seq[PageView]],
outputBasePath: String, outputFilesParts: Int): Unit = {
+ import sqlContext.implicits._
+
+ sessions
+ .filter(views => views.nonEmpty)
+ .map(views => (views.head.wikiDb,
+ new Timestamp(views.head.tstamp),
+ views.map(v => v.pageId.toString).mkString(" ")))
+ .repartition(outputFilesParts)
+ .toDF
+ .withColumnRenamed("_1", "wiki_db")
+ .withColumnRenamed("_2", "tstamp")
+ .withColumnRenamed("_3", "pages")
+ .write
+ .format("com.databricks.spark.csv")
+ .option("delimiter", "\t")
+ .option("header", "true")
+ .mode(SaveMode.Overwrite)
+ .partitionBy("wiki_db")
+ .save(outputBasePath)
+ }
+
+ /**
+ * Config class for CLI argument parser using scopt
+ */
+ case class Params(
+ outputBasePath: String = "/user/shiladsen/sessions",
+ projectNamespaceTable: String =
"wmf_raw.mediawiki_project_namespace_map",
+ pageTable: String = "wmf_raw.mediawiki_page",
+ redirectTable: String = "wmf_raw.mediawiki_redirect",
+ webrequestTable: String = "wmf.webrequest",
+ wikiList: Seq[String] = Seq("simplewiki"),
+ outputFilesParts: Int = 1,
+ snapshot: String = "", // Parameter required, never used
as is
+ begin: Timestamp = new Timestamp(0), // Parameter
required, never used as is
+ end: Timestamp = new Timestamp(0), // Parameter required,
never used as is
+ sessionTimeoutMillis: Int = 15*60*1000
+ )
+
+ val YMD: DateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd")
+ val YMD_HMS: DateTimeFormatter =
DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss")
+ val TS_FORMATS: String = "yyyy-MM-dd or yyyy-MM-ddTHH:mm:ss"
+
+ def parseTimestamp(s: String) : Option[Timestamp] = {
+ var ts = Try[java.sql.Timestamp](new Timestamp(YMD.parseMillis(s)))
+ if (ts.isFailure) {
+ ts = Try[java.sql.Timestamp](new Timestamp(YMD_HMS.parseMillis(s)))
+ }
+ if (ts.isFailure) None else Some(ts.get)
+ }
+
+ /**
+ * Define the command line options parser
+ */
+ val argsParser = new OptionParser[Params]("Clickstream dataset builder") {
+ head("Clickstream dataset builder", "")
+ note(
+ """
+ |This job computes a clickstream dataset from one or more wiki(s).
+ |It creates a date folder, and per-wiki folders to store the results.
+ """.stripMargin)
+ help("help") text ("Prints this usage text")
+
+ opt[String]('s', "snapshot") required() valueName ("<snapshot>") action {
(x, p) =>
+ p.copy(snapshot = x)
+ } text ("The mediawiki hadoop snapshot to use for page, links and
redirects (usually YYYY-MM)")
+
+ opt[String]('b', "begin") required() valueName ("<begin>") action { (x, p)
=>
+ p.copy(begin = parseTimestamp(x).get)
+ } validate { x =>
+ if (parseTimestamp(x).isDefined) success
+ else failure(s"Invalid timestamp format. Must be one of {$TS_FORMATS}")
+ } text ("The beginning time for webrequest data gathering.")
+
+ opt[String]('e', "end") required() valueName ("<end>") action { (x, p) =>
+ p.copy(end = parseTimestamp(x).get)
+ } validate { x =>
+ if (parseTimestamp(x).isDefined) success
+ else failure(s"Invalid timestamp format. Must be one of {$TS_FORMATS}")
+ } text ("The ending time for webrequest data gathering.")
+
+ opt[Int]('t', "timeout") optional() valueName ("<timeout>") action { (x,
p) =>
+ p.copy(sessionTimeoutMillis = x)
+ } validate { x => if (x >= 0) success else failure("Invalid timeout")
+ } text ("The timeout in milliseconds that distinguishes user sessions.")
+
+
+ opt[String]('o', "output-base-path") optional() valueName ("<path>")
action { (x, p) =>
+ p.copy(outputBasePath = if (x.endsWith("/")) x.dropRight(1) else x)
+ } text ("Where on HDFS to store the computed dataset (date folder created
for you). Defaults to hdfs://analytics-hadoop/wmf/data/archive/clickstream")
+
+ opt[String]('w', "wikis") optional() valueName
"<wiki_db_1>,<wiki_db_2>..." action { (x, p) =>
+ p.copy(wikiList = x.split(",").map(_.toLowerCase))
+ } validate { x =>
+ val dbs = x.split(",").map(_.toLowerCase)
+ if (dbs.filter(db => db.isEmpty || (! db.contains("wik"))).length > 0)
+ failure("Invalid wikis list")
+ else
+ success
+ } text "wiki dbs to compute. Defaults to enwiki"
+
+ opt[Int]('p', "output-files-parts") optional() valueName ("<partitions>")
action { (x, p) =>
+ p.copy(outputFilesParts = x)
+ } text ("Number of file parts to output in hdfs. Defaults to 1")
+
+ opt[String]("project-namespace-table") optional() valueName ("<table>")
action { (x, p) =>
+ p.copy(projectNamespaceTable = x)
+ } text ("Fully qualified name of the project-namespace table on Hive.
Default to wmf_raw.mediawiki_project_namespace_map")
+
+ opt[String]("page-table") optional() valueName ("<table>") action { (x, p)
=>
+ p.copy(pageTable = x)
+ } text ("Fully qualified name of the page table on Hive. Default to
wmf_raw.mediawiki_page")
+
+ opt[String]("redirect-table") optional() valueName ("<table>") action {
(x, p) =>
+ p.copy(redirectTable = x)
+ } text ("Fully qualified name of the redirect table on Hive. Default to
wmf_raw.mediawiki_redirect")
+
+ opt[String]("webrequest-table") optional() valueName ("<table>") action {
(x, p) =>
+ p.copy(webrequestTable = x)
+ } text ("Fully qualified name of the webrequest table on Hive. Default to
wmf.webrequest")
+
+ }
+
+
+ def main(args: Array[String]): Unit = {
+
+ val params = args.headOption match {
+ // Case when our job options are given as a single string. Split them
+ // and pass them to argsParser.
+ case Some("--options") =>
+ argsParser.parse(args(1).split("\\s+"),
Params()).getOrElse(sys.exit(1))
+ // Else the normal usage, each CLI opts can be parsed as a job option.
+ case _ =>
+ argsParser.parse(args, Params()).getOrElse(sys.exit(1))
+ }
+
+
+ val conf = new SparkConf()
+ .setAppName(s"SessionPagesBuilder")
+ .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .set("spark.hadoop.mapred.output.compress", "true")
+ .set("spark.hadoop.mapred.output.compression.codec", "true")
+ .set("spark.hadoop.mapred.output.compression.codec",
"org.apache.hadoop.io.compress.GzipCodec")
+ .set("spark.hadoop.mapred.output.compression.type", "BLOCK")
+
+ val sqlContext = new HiveContext(new SparkContext(conf))
+
+ // Exit non-zero if if any refinements failed.
+ apply(sqlContext, params)
+ }
+
+ def apply(sql: SQLContext, params: Params): Unit = {
+ sql.sparkContext
+ .getConf
+ .registerKryoClasses(Array(
+ classOf[PageInfo],
+ classOf[Redirect],
+ classOf[PageView]))
+
+ import sql.implicits._
+
+ val projectToWikiMap = prepareProjectToWikiMap(sql,
params.projectNamespaceTable, params.snapshot, params.wikiList)
+ val domainList = projectToWikiMap.keys.toList
+ val projectList = domainList.map(_.stripSuffix(".org"))
+
+ val pages = preparePages(sql, params.pageTable, params.snapshot,
params.wikiList).cache()
+ val redirects = prepareRedirects(sql, params.redirectTable,
params.snapshot, params.wikiList, pages)
+ val pages2 = resolveRedirects(sql, pages, redirects)
+
+ val viewBeg = new Timestamp(new DateTime(params.begin.getTime)
+ .minusSeconds(LOOK_PAST_SECS)
+ .getMillis)
+ val viewEnd = new Timestamp(new DateTime(params.end.getTime)
+ .minusSeconds(LOOK_FUTURE_SECS)
+ .getMillis)
+ val views = prepareViews( sql, params.webrequestTable,
+ viewBeg, viewEnd,
+ projectList,
+ projectToWikiMap,
+ pages)
+
+ val beginMillis = params.begin.getTime
+ val sessions = createSessions(views, params.sessionTimeoutMillis)
+ .filter(_.head.tstamp >= beginMillis)
+ writeSessions(sql, sessions, params.outputBasePath,
params.outputFilesParts)
+ }
+}
diff --git
a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestDataCreator.scala
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestDataCreator.scala
new file mode 100644
index 0000000..c1641b0
--- /dev/null
+++
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestDataCreator.scala
@@ -0,0 +1,216 @@
+package org.wikimedia.analytics.refinery.job.vectors
+
+import java.io.{FileWriter, PrintWriter}
+import java.sql.Timestamp
+
+import org.joda.time.DateTime
+import org.json.simple.JSONObject
+import
org.wikimedia.analytics.refinery.job.vectors.SessionPagesBuilder.{PageInfo,
PageView, Redirect, userHash}
+
+import scala.collection.{immutable, mutable}
+import scala.reflect.io.Path
+import scala.util.Random
+
+
+/**
+ * Creates fake data that can be processed by SessionPagesBuilder.
+ * Along with expected outcomes.
+ *
+ * @author Shilad Sen
+ */
+object TestDataCreator {
+
+ case class UserInfo(x: String, y: String, z: String, hash: Long)
+
+ import TestParams._
+
+ def randNamespace() : Int = {
+ (Random.nextInt(10) - 7).max(0) // In [0,1,2] but 0 has probability 0.8
+ }
+
+ def choice[T](seq : Seq[T]) : T = {
+ seq(Random.nextInt(seq.length))
+ }
+
+ val usedUserHashes: mutable.Set[Long] = scala.collection.mutable.Set[Long]()
+
+ def randUser() : UserInfo = {
+ val randStr = () => { (1 to 10).map(_ => Random.nextPrintableChar() +
"").mkString("") }
+ val u = (randStr(), randStr(), randStr())
+ val hash : Long = userHash(u._1, u._2, u._3)
+ if (usedUserHashes.contains(hash)) {
+ randUser()
+ } else {
+ usedUserHashes += hash
+ UserInfo(u._1, u._2, u._3, hash)
+ }
+ }
+
+ def generatePages(wikis: Seq[String], numPagesPerWiki: Int): Seq[PageInfo] =
{
+ wikis
+ .flatMap(w => (1 to numPagesPerWiki).map((w, _))) // seq of (wikiId,
pageId)
+ .map { case (w, p) =>
+ PageInfo(w, p, "Article " + p, randNamespace(), pageIsRedirect =
false, -1)
+ }
+ }
+
+ def usedRedirects: mutable.Set[(String, String)] =
scala.collection.mutable.Set[(String, String)]()
+
+ def generateRedirects(pages: Seq[PageInfo], redirectsPerWiki: Int) :
Seq[Redirect]= {
+ val maxId = pages.map(_.pageId).max
+ val wikis = pages.map(_.wikiDb).distinct.toList
+ val byWiki = pages.groupBy(_.wikiDb)
+
+ byWiki.keys.flatMap { w =>
+ val shuffled = Random.shuffle(byWiki(w).toList).toArray
+ val srcs = shuffled.slice(0, redirectsPerWiki)
+ val dests = shuffled.slice(redirectsPerWiki, redirectsPerWiki * 2)
+ srcs.zip(dests).map { pair => Redirect(w, pair._1.pageId,
pair._2.pageId) }
+ }.toSeq
+ }
+
+ def generateSessions(start: Long, end: Long, pages: Seq[PageInfo],
numSessions: Int, meanLength: Int): Seq[(UserInfo, Seq[(Long, PageInfo)])] = {
+ val byWiki = pages.groupBy(_.wikiDb)
+ val wikis = byWiki.keys.toList
+ (1 to numSessions)
+ .map { _ =>
+ val n = 1 + Random.nextInt(meanLength * 2)
+ val w = choice(wikis)
+ val tstamps = (1 to n)
+ .foldLeft(List(start +
Random.nextInt((end-start).asInstanceOf[Int])))(
+ (seq, _) => (seq.head +
Random.nextInt(SESSION_TIMEOUT_MILLIS-1))::seq)
+ .reverse
+ val session = tstamps.map { (_, choice(byWiki(w))) }
+ (randUser(), session.toList)
+ }
+ }
+
+ def writeSessions(path: String,
+ pages: Seq[PageInfo],
+ redirects: Seq[Redirect],
+ sessions: Seq[(UserInfo, Seq[(Long, PageInfo)])]) : Unit =
{
+ val pageIdMap = pages
+ .map(p => ((p.wikiDb, p.pageId), p.pageId.toString))
+ .union(redirects.map(r => ((r.wikiDb, r.fromPageId),
r.toPageId.toString)))
+ .toMap
+
+ val out = new PrintWriter(new FileWriter(path))
+ sessions.foreach {
+ s =>
+ val ts = s._2.head._1.toString
+ val wiki = s._2.head._2.wikiDb
+ val pageIds = s._2.map(tsAndPage => pageIdMap((tsAndPage._2.wikiDb,
tsAndPage._2.pageId))).mkString(" ")
+ out.write(ts + "\t" + wiki + "\t" + pageIds + "\n")
+ }
+ out.close()
+ }
+
+ import scala.collection.JavaConverters._
+
+
+ def writeWikis(path: String, pages: Seq[PageInfo]): Unit = {
+ val out = new PrintWriter(new FileWriter(path))
+ pages.map(_.wikiDb).distinct.foreach {
+ w =>
+ val js = Map(
+ "hostname" -> (w + ".org"),
+ "snapshot" -> SNAPSHOT,
+ "dbname" -> w
+ )
+ JSONObject.writeJSONString(js.asJava, out)
+ out.write("\n")
+ }
+ out.close()
+ }
+
+ def writePages(path: String, pages: Seq[PageInfo]): Unit = {
+ val out = new PrintWriter(new FileWriter(path))
+ pages.foreach { p =>
+ val js = Map(
+ "wiki_db" -> p.wikiDb,
+ "page_id" -> p.pageId,
+ "page_title" -> p.pageTitle,
+ "snapshot" -> SNAPSHOT,
+ "page_namespace" -> p.pageNamespace,
+ "page_is_redirect" -> p.pageIsRedirect
+ )
+ JSONObject.writeJSONString(js.asJava, out)
+ out.write("\n")
+ }
+ out.close()
+ }
+
+ def writeRedirects(path: String, pages: Seq[PageInfo], redirects:
Seq[Redirect]): Unit = {
+ val titles = pages.map(p => ((p.wikiDb, p.pageId), p.pageTitle)).toMap
+
+ val out = new PrintWriter(new FileWriter(path))
+ redirects.foreach { r =>
+ val js = Map(
+ "wiki_db" -> r.wikiDb,
+ "rd_from" -> titles((r.wikiDb, r.fromPageId)),
+ "snapshot" -> SNAPSHOT,
+ "rd_title" -> titles((r.wikiDb, r.toPageId)),
+ "rd_namespace" -> 0
+ )
+ JSONObject.writeJSONString(js.asJava, out)
+ out.write("\n")
+ }
+ out.close()
+ }
+
+
+ def writeRequests(path: String, sessions: Seq[(UserInfo, Seq[(Long,
PageInfo)])]): Unit = {
+ val allViews = sessions.flatMap { s =>
+ val userInfo = s._1
+ s._2.map( tsAndPage => (userInfo, tsAndPage._1, tsAndPage._2))
+ }
+
+ val out = new PrintWriter(new FileWriter(path))
+ Random.shuffle(allViews).foreach { v =>
+ val user = v._1
+ val dt = new DateTime(v._2)
+ val p = v._3
+ val pvi = Map(
+ "project" -> p.wikiDb,
+ "page_title" -> p.pageTitle
+ ).asJava
+
+ val js = Map(
+ "pageview_info" -> pvi,
+ "ts" -> new Timestamp(dt.getMillis).toString,
+ "user_agent" -> user.x,
+ "x_forwarded_for" -> user.y,
+ "client_ip" -> user.z,
+ "is_pageview" -> true,
+ "agent_type" -> "user",
+ "webrequest_source" -> "text",
+ "year" -> dt.getYear,
+ "month" -> dt.getMonthOfYear,
+ "day" -> dt.getDayOfMonth,
+ "hour" -> dt.getHourOfDay
+ )
+ JSONObject.writeJSONString(js.asJava, out)
+ out.write("\n")
+ }
+ out.close()
+ }
+
+ def main(args: Array[String]): Unit = {
+ val pages = generatePages(WIKIS, NUM_PAGES)
+ val redirects = generateRedirects(pages, NUM_REDIRECTS)
+
+ // Allow some sessions to start earlier than the requested range.
+ val minStart = START_TIME.getMillis - (END_TIME.getMillis -
START_TIME.getMillis) / 4
+ val sessions = generateSessions(minStart, END_TIME.getMillis,
+ pages, NUM_SESSIONS, MEAN_SESSION_LENGTH)
+
+ Path(PATH_DATA).createDirectory(force = true, failIfExists = false)
+
+ writeWikis(PATH_DATA + "/wikis.json", pages)
+ writePages(PATH_DATA + "/pages.json", pages)
+ writeRedirects(PATH_DATA + "/redirects.json", pages, redirects)
+ writeRequests(PATH_DATA + "/views.json", sessions)
+ writeSessions(PATH_DATA + "/sessions.txt", pages, redirects,
+ sessions.filter(_._2.head._1 >= START_TIME.getMillis))
+ }
+}
diff --git
a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestParams.scala
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestParams.scala
new file mode 100644
index 0000000..85e5222
--- /dev/null
+++
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestParams.scala
@@ -0,0 +1,25 @@
+package org.wikimedia.analytics.refinery.job.vectors
+
+import org.joda.time.DateTime
+
+import scala.collection.immutable
+
+/**
+ * @author Shilad Sen
+ */
+object TestParams {
+ val SNAPSHOT: String = "2016-10"
+
+ val START_TIME = new DateTime(2017, 1, 1, 21, 0, 0)
+ val END_TIME = new DateTime(2017, 1, 2, 2, 0, 0)
+
+ val WIKIS: Seq[String] = (1 to 3).map{ "wiki_" + _ }
+ val NUM_PAGES = 1000 // per wiki
+ val NUM_REDIRECTS = 300 // per wiki
+ val NUM_SESSIONS = 1000
+
+ val MEAN_SESSION_LENGTH = 20
+ val SESSION_TIMEOUT_MILLIS: Int = 60 * 15 * 1000 // 15 min
+
+ val PATH_DATA = "./test-data/"
+}
diff --git
a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestPartitionQueryBuilder.scala
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestPartitionQueryBuilder.scala
new file mode 100644
index 0000000..84c8bea
--- /dev/null
+++
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestPartitionQueryBuilder.scala
@@ -0,0 +1,77 @@
+package org.wikimedia.analytics.refinery.job.vectors
+
+import java.sql.Timestamp
+
+import org.scalatest.{FlatSpec, Matchers}
+import PartitionQueryBuilder.formSql
+
+class TestPartitionQueryBuilder extends FlatSpec with Matchers {
+
+ def sqlOr(conds : Seq[String]): String = { conds.map("(" + _ +
")").mkString(" or ")}
+ def normalize(s : String) : String = { s.toLowerCase.replaceAllLiterally("
", "")}
+
+ "A PartitionQueryBuilder " should " build partition queries " in {
+
+ val tests: Seq[(String, String, String)] = Seq(
+ ( // Simple example
+ "2017-01-04 03:10:00",
+ "2017-01-05 04:00:00",
+ sqlOr(List("year = 2017 and month = 1 and day = 4 and hour >= 3",
+ "year = 2017 and month = 1 and day = 5 and hour <= 4"))
+ ),
+ ( // Edge case with empty interval
+ "2017-01-04 03:00:00",
+ "2017-01-04 03:00:00",
+ sqlOr(List("year = 2017 and month = 1 and day = 4 and hour = 3"))
+ ),
+ ( // Start and end in same day
+ "2017-01-04 03:10:00",
+ "2017-01-04 07:00:00",
+ sqlOr(List(
+ "year = 2017 and month = 1 and day = 4 and hour >= 3 and hour <= 7"))
+ ),
+ ( // Have a full day in the middle
+ "2017-01-04 03:10:00",
+ "2017-01-06 04:00:00",
+ sqlOr(List(
+ "year = 2017 and month = 1 and day = 4 and hour >= 3",
+ "year = 2017 and month = 1 and day = 5",
+ "year = 2017 and month = 1 and day = 6 and hour <= 4"))
+ ),
+ ( // Have a full month in the middle
+ "2017-01-30 03:10:00",
+ "2017-03-02 04:00:00",
+ sqlOr(List(
+ "year = 2017 and month = 1 and day = 30 and hour >= 3",
+ "year = 2017 and month = 1 and day = 31",
+ "year = 2017 and month = 2",
+ "year = 2017 and month = 3 and day = 1",
+ "year = 2017 and month = 3 and day = 2 and hour <= 4"))
+ ),
+ ( // Have a full year in the middle
+ "2014-11-29 03:10:00",
+ "2017-02-02 04:00:00",
+ sqlOr(List(
+ "year = 2014 and month = 11 and day = 29 and hour >= 3",
+ "year = 2014 and month = 11 and day = 30",
+ "year = 2014 and month = 12",
+ "year = 2015",
+ "year = 2016",
+ "year = 2017 and month = 1",
+ "year = 2017 and month = 2 and day = 1",
+ "year = 2017 and month = 2 and day = 2 and hour <= 4"))
+ )
+ )
+
+ tests.foreach( t => {
+ val beg = Timestamp.valueOf(t._1)
+ val end = Timestamp.valueOf(t._2)
+ val sql = t._3
+ val res = formSql(beg, end)
+ System.out.println(res)
+
+ normalize(sql) shouldEqual normalize(res)
+ })
+ }
+
+}
diff --git
a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPagesBuilder.scala
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPagesBuilder.scala
new file mode 100644
index 0000000..749aab8
--- /dev/null
+++
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPagesBuilder.scala
@@ -0,0 +1,45 @@
+package org.wikimedia.analytics.refinery.job.vectors
+
+import java.sql.Timestamp
+
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.{SparkConf, SparkContext}
+
+/**
+ * @author Shilad Sen
+ */
+object TestSessionPagesBuilder {
+ import TestParams._
+
+ def main(args: Array[String]): Unit = {
+ val params = SessionPagesBuilder.Params(
+ outputBasePath = PATH_DATA + "/spark_sessions",
+ projectNamespaceTable = "wikis",
+ pageTable = "pages",
+ redirectTable = "redirects",
+ wikiList = WIKIS.toList,
+ webrequestTable = "requests",
+ snapshot = SNAPSHOT,
+ begin = new Timestamp(START_TIME.getMillis),
+ end = new Timestamp(END_TIME.getMillis)
+ )
+
+ val conf = new SparkConf()
+ .setMaster("local[*]")
+ .setAppName("test-session-pages-builder")
+ val sc = new SparkContext(conf)
+
+
+ val sql = new SQLContext(sc)
+ sql.read.json(PATH_DATA + "/views.json")
+ .registerTempTable(params.webrequestTable)
+ sql.read.json(PATH_DATA + "/redirects.json")
+ .registerTempTable(params.redirectTable)
+ sql.read.json(PATH_DATA + "/pages.json")
+ .registerTempTable(params.pageTable)
+ sql.read.json(PATH_DATA + "/wikis.json")
+ .registerTempTable(params.projectNamespaceTable)
+
+ SessionPagesBuilder(sql, params)
+ }
+}
\ No newline at end of file
--
To view, visit https://gerrit.wikimedia.org/r/381169
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I55395459d80d73f3d065967ce95d6506698d128e
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery/source
Gerrit-Branch: nav-vectors
Gerrit-Owner: Shilad Sen <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits