Shilad Sen has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/381169 )

Change subject: WIP: Spark job to create page ids viewed in each session
......................................................................

WIP: Spark job to create page ids viewed in each session

This job will create a summary of viewer sessions, with one line per
user, and each line consisting of a wiki project code, timestamp, and
all the page ids viewed in the session.

The job now runs on the cluster in a reasonable amount of time (22 min
for a day's worth of views).

I have written a testing harness, but not quite automated it yet. It is
failing and I have not started debugging yet, but this shouldn't be too
tricky.

TODO:
* Polish and debug tests
* Oozify job (may require switching to Spark 2)

Bug: T174796
Change-Id: I55395459d80d73f3d065967ce95d6506698d128e
---
A 
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/PartitionQueryBuilder.scala
A 
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
A 
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestDataCreator.scala
A 
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestParams.scala
A 
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestPartitionQueryBuilder.scala
A 
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPagesBuilder.scala
6 files changed, 1,064 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source 
refs/changes/69/381169/1

diff --git 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/PartitionQueryBuilder.scala
 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/PartitionQueryBuilder.scala
new file mode 100644
index 0000000..cd43cea
--- /dev/null
+++ 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/PartitionQueryBuilder.scala
@@ -0,0 +1,96 @@
+package org.wikimedia.analytics.refinery.job.vectors
+
+import java.sql.Timestamp
+
+import org.joda.time.{DateTime, DateTimeConstants, Hours, Interval}
+
+/**
+  * @author Shilad Sen
+  */
+object PartitionQueryBuilder {
+
+  /**
+    * Creations a SQL predicate that includes the time period from beginTs to 
endTs
+    *
+    * The SQL predicate is somewhat compressed, but there is still room for 
improvement.
+    *
+    * @return "(year = 2016) or (year = 2017 and month = 1 and day = 1 and 
hour < 3)"
+    */
+  def formSql(beginTs: Timestamp, endTs: Timestamp) : String = {
+    val begin = new DateTime(beginTs.getTime).hourOfDay().roundFloorCopy()
+    val end = new DateTime(endTs.getTime).hourOfDay().roundCeilingCopy()
+    if (begin == end) {
+        s"(year = ${begin.getYear} " +
+        s"and month = ${begin.getMonthOfYear} " +
+        s"and day = ${begin.getDayOfMonth} " +
+        s"and hour = ${begin.getHourOfDay})"
+    } else {
+      formSqlConditions(begin, end).map("(" + _ + ")").mkString(" OR ")
+    }
+  }
+
+  def formSqlConditions(begin: DateTime, end: DateTime) : Seq[String] = {
+    if (begin == end) {
+      return List()
+    }
+
+    // Try to take a year out of the middle
+    var startYear = begin.year().roundCeilingCopy()
+    var endYear = startYear.plusYears(1)
+    if (!startYear.isBefore(begin) && !endYear.isAfter(end)) {
+      return  formSqlConditions(begin, startYear) ++
+              List(s"year = ${startYear.getYear}") ++
+              formSqlConditions(endYear, end)
+    }
+
+    // Try to take a month out of the middle
+    var startMonth = begin.monthOfYear().roundCeilingCopy()
+    var endMonth = startMonth.plusMonths(1)
+    if (!startMonth.isBefore(begin) && !endMonth.isAfter(end)) {
+      return  formSqlConditions(begin, startMonth) ++
+        List(s"year = ${startMonth.getYear} " +
+             s"and month = ${startMonth.getMonthOfYear}") ++
+        formSqlConditions(endMonth, end)
+    }
+
+    // Try to take a day out of the middle
+    var startDay = begin.dayOfMonth().roundCeilingCopy()
+    var endDay = startDay.plusDays(1)
+    if (!startDay.isBefore(begin) && !endDay.isAfter(end)) {
+      return  formSqlConditions(begin, startDay) ++
+        List(s"year = ${startDay.getYear} " +
+             s"and month = ${startDay.getMonthOfYear} " +
+             s"and day = ${startDay.getDayOfMonth}") ++
+        formSqlConditions(endDay, end)
+    }
+
+    // Do we have a collection of hours that run up to the end of the starting 
day?
+    var startOfNextDay = begin.withTimeAtStartOfDay().plusDays(1)
+    if (!startOfNextDay.isAfter(end)) {
+     return  List(s"year = ${begin.getYear} " +
+                  s"and month = ${begin.getMonthOfYear} " +
+                  s"and day = ${begin.getDayOfMonth} " +
+                  s"and hour >= ${begin.getHourOfDay}") ++
+            formSqlConditions(startOfNextDay, end)
+    }
+
+    // Do we have a collection of hours that start at the beginning of the 
last day?
+    var startOfLastDay = end.withTimeAtStartOfDay()
+    if (!startOfLastDay.isBefore(begin)) {
+      return formSqlConditions(begin, startOfLastDay) ++
+             List(s"year = ${end.getYear} " +
+                  s"and month = ${end.getMonthOfYear} " +
+                  s"and day = ${end.getDayOfMonth} " +
+                  s"and hour <= ${end.getHourOfDay}")
+    }
+
+    // We must have an hour range within the same day.
+    assert(begin.withTimeAtStartOfDay() == end.withTimeAtStartOfDay())
+    List(s"year = ${begin.getYear} " +
+         s" and month = ${begin.getMonthOfYear}" +
+         s" and day = ${begin.getDayOfMonth}" +
+         s" and hour >= ${begin.getHourOfDay}" +
+         s" and hour <= ${end.getHourOfDay}")
+
+  }
+}
diff --git 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
new file mode 100644
index 0000000..1d03705
--- /dev/null
+++ 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
@@ -0,0 +1,605 @@
+package org.wikimedia.analytics.refinery.job.vectors
+
+import java.sql.Timestamp
+
+import org.joda.time.Hours
+import org.joda.time.format.{DateTimeFormat, DateTimeFormatter}
+
+import scala.util.Try
+
+
+/**
+  * Process to create a page view session log.
+  *
+  * This create output files with one line per session.
+  * The first two tokens of each line are the session start timestamp and 
language edition.
+  * The following tokens are the page ids viewed in the session, in order.
+  **
+ *
+  * @author Shilad Sen
+  */
+object SessionPagesBuilder {
+
+  import org.apache.spark.sql.hive.HiveContext
+  import org.apache.spark.SparkConf
+  import org.apache.spark.sql.SaveMode
+  import org.apache.spark.SparkContext
+  import scopt.OptionParser
+  import org.apache.spark.rdd.RDD
+  import org.apache.spark.sql.SQLContext
+  import org.joda.time.DateTime
+  import org.apache.log4j.{Level, LogManager}
+  import org.apache.spark.Partitioner
+  import java.sql.Timestamp
+
+  import scala.collection.mutable.ArrayBuffer
+  import scala.util.hashing.MurmurHash3
+
+  /**
+    * Number of seconds of later data to gather beyond the
+    * time interval that is requested in the params to try to
+    * look for the end of sessions that spill over the interval.
+    * These will be included
+    */
+  val LOOK_FUTURE_SECS: Int = 60 * 60 * 3
+
+  /**
+    * How far in the past to look for sessions with start times BEFORE
+    * the requested interval that spill into the requested interval.
+    * These will be excluded from the output.
+    */
+  val LOOK_PAST_SECS: Int = 60 * 59
+
+
+  case class SessionKey (wikiDb: String, userHash: Long, tstamp: Long)
+    extends Ordered[SessionKey] {
+    override def compare(that: SessionKey): Int = {
+      import scala.math.Ordered.orderingToOrdered
+
+      Ordering.Tuple3(Ordering.String, Ordering.Long, Ordering.Long).compare(
+        (wikiDb, userHash, tstamp),
+        (that.wikiDb, that.userHash, that.tstamp)
+      )
+    }
+  }
+
+
+  class SessionPartitioner(partitions: Int) extends Partitioner {
+    require(partitions >= 0, s"Number of partitions ($partitions) cannot be 
negative.")
+
+    override def numPartitions: Int = partitions
+
+    override def getPartition(key: Any): Int = {
+      val k = key.asInstanceOf[SessionKey]
+      Math.abs(k.wikiDb.hashCode() + k.userHash.asInstanceOf[Int]) % 
numPartitions
+    }
+  }
+
+  object SessionKey {
+    implicit def orderingByTimestamp[A <: SessionKey] : Ordering[A] = {
+      Ordering.by(fk => (fk.wikiDb, fk.userHash, fk.tstamp))
+    }
+  }
+
+  def getTimestamp(ts: Any) : Timestamp = {
+    ts match {
+      case _: Timestamp =>
+        ts.asInstanceOf[Timestamp]
+      case _: String =>
+        Timestamp.valueOf(ts.asInstanceOf[String])
+      case _ => throw new IllegalArgumentException(ts.toString)
+    }
+  }
+
+  /**
+    * A single page.
+    * Effective page id is the page id after resolving redirects.
+    */
+  case class PageInfo(
+                       wikiDb: String,
+                       pageId: Long,
+                       pageTitle: String,
+                       pageNamespace: Long,
+                       pageIsRedirect: Boolean,
+                       effectivePageId : Long
+                     )
+
+  /**
+    * A view of a page by a user.
+    * tstamp is milliseconds since the epoch.
+    * userHash is a 64-bit hash of user ip and other unique information.
+    */
+  case class PageView(
+                       wikiDb: String,
+                       userHash: Long,
+                       tstamp: Long,
+                       pageNamespace: Long,
+                       pageId: Long
+                     )
+
+  case class Redirect(
+                       wikiDb: String,
+                       fromPageId: Long,
+                       toPageId: Long
+                     )
+
+
+  /**
+    * Returns true iff the two view are part of the same session.
+    * User hashes must match and the pageviews must fall within a specified 
window.
+    */
+  def sameSession(v1: PageView, v2: PageView, timeoutMillis: Long) : Boolean = 
{
+    (v1.wikiDb == v2.wikiDb) &&
+      (v1.userHash == v2.userHash) &&
+      (Math.abs(v1.tstamp - v2.tstamp) <= timeoutMillis)
+  }
+
+
+  /**
+    * Given components associated with a distinct user , return a long hash
+    * @param client_ip
+    * @param user_agent
+    * @param x_forwarded_for
+    * @return
+    */
+  def userHash(
+               client_ip: String,
+               user_agent: String,
+               x_forwarded_for: String
+             ) : Long = {
+    var s = client_ip + ":" + user_agent + ":" +  x_forwarded_for
+    (MurmurHash3.stringHash(s).asInstanceOf[Long] << 32) |
+      (MurmurHash3.stringHash(s.reverse).asInstanceOf[Long] & 0xFFFFFFFL)
+  }
+
+  def listToSQLInCondition(list: Seq[String]): String = list.map(w => 
s"'$w'").mkString(", ")
+
+  /**
+    * Prepare a map linking project hostnames to project dbnames.
+    *
+    * @return A map[hostname -> dbname]
+    */
+  def prepareProjectToWikiMap(
+                               sqlContext: SQLContext,
+                               projectNamespaceTable: String,
+                               snapshot: String,
+                               wikiList: Seq[String]
+                             ): Map[String, String] = {
+    sqlContext.sql(
+      s"""
+         |SELECT DISTINCT
+         |  hostname,
+         |  dbname
+         |FROM $projectNamespaceTable
+         |WHERE snapshot = '$snapshot'
+         |  AND dbname IN (${listToSQLInCondition(wikiList)})
+        """.stripMargin).
+      rdd.
+      collect.
+      map(r => r.getString(0) -> r.getString(1)).toMap
+  }
+
+  def hoursInRange(begin: org.joda.time.DateTime, end: org.joda.time.DateTime) 
: Seq[(Int, Int, Int, Int)]= {
+    val beginHour = begin.hourOfDay().roundFloorCopy()
+    val endHour = end.hourOfDay().roundCeilingCopy()
+    val numHours = Hours.hoursBetween(beginHour, endHour).getHours
+    val dates = (0 to numHours).map(beginHour.plusHours)
+    dates.map(d => (d.getYear, d.getMonthOfYear, d.getDayOfMonth, 
d.getHourOfDay))
+  }
+
+  /**
+    * Prepare the pages dataset to be reused
+    *
+    * @return A RDD of PageInfo data augmented with fake pages for each wiki
+    */
+  def preparePages(
+                    sqlContext: SQLContext,
+                    pageTable: String,
+                    snapshot: String,
+                    wikiList: Seq[String]
+                  ): RDD[PageInfo] = {
+    sqlContext.sql(
+      s"""
+                      |SELECT
+                      |  wiki_db,
+                      |  page_id,
+                      |  page_title,
+                      |  page_namespace,
+                      |  page_is_redirect
+                      |FROM $pageTable
+                      |WHERE snapshot = '$snapshot'
+                      |  AND wiki_db IN (${listToSQLInCondition(wikiList)})
+                      |  AND page_id IS NOT NULL
+                      |  AND page_namespace = 0
+                      |  AND page_id > 0
+                      |GROUP BY
+                      |  wiki_db,
+                      |  page_id,
+                      |  page_title,
+                      |  page_namespace,
+                      |  page_is_redirect
+        """.stripMargin).rdd.
+      map(r => {
+        PageInfo(r.getString(0), r.getLong(1), r.getString(2),
+                 r.getLong(3), r.getBoolean(4), r.getLong(1))
+      })
+  }
+
+  /**
+    * Prepare the redirects dataset to be reused
+    *
+    * @return A RDD of redirect data
+    */
+  def prepareRedirects(
+                        sqlContext: SQLContext,
+                        redirectTable: String,
+                        snapshot: String,
+                        wikiList: Seq[String],
+                        pages: RDD[PageInfo]
+                      ): RDD[Redirect] = {
+
+    val pagesPerPageId = pages.map(p => ((p.wikiDb, p.pageId), 
p.pageTitle)).cache()
+    val pagesPerTitleAndNamespace = pages.map(p => ((p.wikiDb, p.pageTitle, 
p.pageNamespace), p.pageId)).cache()
+
+    sqlContext.sql(
+      s"""
+         |SELECT
+         |  wiki_db,
+         |  rd_from,
+         |  rd_title,
+         |  rd_namespace
+         |FROM $redirectTable
+         |WHERE snapshot = '$snapshot'
+         |  AND wiki_db IN (${listToSQLInCondition(wikiList)})
+         |  AND rd_from IS NOT NULL
+         |  AND rd_from > 0
+         |GROUP BY
+         |  wiki_db,
+         |  rd_from,
+         |  rd_title,
+         |  rd_namespace
+          """.stripMargin)
+        .rdd
+        .map(r => {
+          ((r.getString(0), r.getLong(1)), (r.getString(2), r.getLong(3)))
+        })
+        .filter(t => Option(t._2._1).isDefined)   // Remove null toPageTitle
+        .join(pagesPerPageId)                     // Ensure fromPageId exists 
in page table
+        .map { case ((wiki, fromPageId), ((toPageTitle, toPageNamespace), _)) 
=>
+                  ((wiki, toPageTitle, toPageNamespace), fromPageId) }
+        .join(pagesPerTitleAndNamespace)          // Get toPageId from page 
table
+        .map { case ((wiki, _ , _), (fromPageId, toPageId)) =>
+                  Redirect(wiki, fromPageId, toPageId) }
+        .distinct // prevent any duplicate (data corruption)
+  }
+
+  /**
+    * Replace page ids for pages that are redirected.
+    *
+    * @return PageInfo with redirects applied to relevant page ids
+    */
+  def resolveRedirects(
+                      sqlContext: SQLContext,
+                      pages: RDD[PageInfo],
+                      redirects: RDD[Redirect]
+                    ) : RDD[PageInfo] = {
+
+    var redirectsBySrc = redirects.map(r => ((r.wikiDb, r.fromPageId), 
r.toPageId)).cache()
+
+    // This function maps redirects once
+    val processRedirects = (pages: RDD[PageInfo]) =>  {
+                              pages.map( p => ((p.wikiDb, p.effectivePageId), 
p) )
+                                .leftOuterJoin(redirectsBySrc)
+                                .map { case (_, (p, redirectId)) =>
+                                  PageInfo(p.wikiDb,
+                                            p.pageId,
+                                            p.pageTitle, p.pageNamespace,
+                                            p.pageIsRedirect,
+                                            
redirectId.getOrElse(p.effectivePageId))
+                                }
+                            } : RDD[PageInfo]
+
+    // We only process two-levels of redirects. Is this okay?
+    processRedirects(
+      processRedirects(pages))
+  }
+
+
+  /**
+    * Prepare the view data
+    * @return A RDD of page views
+    */
+  def prepareViews(
+                          sqlContext: SQLContext,
+                          webrequestTable: String,
+                          begin: Timestamp,
+                          end: Timestamp,
+                          projectList: Seq[String],
+                          projectToWikiMap: Map[String, String],
+                          pages: RDD[PageInfo]
+                        ): RDD[PageView] = {
+    val pagesPerTitles = pages
+      .map(p => ((p.wikiDb, p.pageTitle), (p.pageNamespace, p.pageId))) // 
((wiki, pageTitle), (pageNamespace, pageId))
+      .cache()
+
+    sqlContext.sql(
+      s"""
+         |SELECT
+         |  CONCAT(pageview_info['project'], '.org') AS project,
+         |  pageview_info['page_title'] as title,
+         |  ts,
+         |  client_ip,
+         |  user_agent,
+         |  x_forwarded_for
+         |FROM $webrequestTable
+         |WHERE webrequest_source = 'text'
+         |  AND(${PartitionQueryBuilder.formSql(begin, end)})
+         |  AND pageview_info['project'] IN 
(${listToSQLInCondition(projectList)})
+         |  AND is_pageview
+         |  AND agent_type = 'user'
+          """.stripMargin)
+      .rdd
+      .map(r => {
+                  val wiki = projectToWikiMap(r.getString(0))
+                  val uh = userHash(r.getString(3), r.getString(4), 
r.getString(5))
+                  ((wiki, r.getString(1)), (getTimestamp(r.get(2)), uh))
+                })
+      .join(pagesPerTitles)
+      .map { case ((wiki, title), ((ts, userHash), (pageNs, pageId))) =>
+             PageView(wiki, userHash, ts.getTime, pageNs, pageId)
+           }
+  }
+
+  /**
+    * Aggregates page views by user / wiki and groups them into sessions.
+    *
+    * 1. Create session key composed of wiki, userhash, tstamp that delineates 
sessions.
+    * 2. Partition by wiki, userhash with secondary sort on tstamp.
+    * 3. Reduce page views into sessions on each partitition. The iterator
+    *    is necessary for the final step because there could be millions / 
billions
+    *    of page views on a partition so we cannot load the result set in 
memory.
+    *
+    * @return
+    */
+  def createSessions(views: RDD[PageView], timeoutInMillis: Int): 
RDD[Seq[PageView]] = {
+    views
+      .map { v => (SessionKey(v.wikiDb, v.userHash, v.tstamp), v) }
+      .repartitionAndSortWithinPartitions(new 
SessionPartitioner(views.getNumPartitions))
+      .mapPartitions {
+        iter =>
+          new Iterator[Seq[PageView]] {
+            var eos: Boolean = !iter.hasNext
+            var session = new ArrayBuffer[PageView]()
+            var nextSessionHead : Option[PageView] = None
+
+            override def hasNext: Boolean = {
+              tryToFillSession()
+              session.nonEmpty
+            }
+
+            override def next: Seq[PageView] = {
+              tryToFillSession()
+              assert(session.nonEmpty)
+
+              // Create a new empty session and place the next session head 
into it
+              var s = session
+              session = new ArrayBuffer[PageView]()
+              if (nextSessionHead.isDefined) {
+                session += nextSessionHead.get
+                nextSessionHead = None
+              }
+              s
+            }
+
+            /**
+              * Recursively adds the next page view onto the current session 
until
+              * we are certain it belongs to the next session. The sign this 
has occurred
+              * is nextSessionHead is non-empty
+              */
+            def tryToFillSession(): Unit = {
+              if (!eos && nextSessionHead.isEmpty && iter.hasNext) {
+                var p = iter.next._2
+                if (session.isEmpty || sameSession(p, session.last, 
timeoutInMillis)) {
+                  session += p
+                  tryToFillSession()
+                } else {
+                  nextSessionHead = Some(p)
+                }
+              }
+            }
+          }
+      }
+  }
+
+
+  /**
+    * Write sessions to output files.
+    * TODO: Overwrite existing files.
+    */
+  def writeSessions(sqlContext: SQLContext, sessions: RDD[Seq[PageView]], 
outputBasePath: String, outputFilesParts: Int): Unit = {
+    import sqlContext.implicits._
+
+    sessions
+      .filter(views => views.nonEmpty)
+      .map(views => (views.head.wikiDb,
+                     new Timestamp(views.head.tstamp),
+                     views.map(v => v.pageId.toString).mkString(" ")))
+      .repartition(outputFilesParts)
+      .toDF
+      .withColumnRenamed("_1", "wiki_db")
+      .withColumnRenamed("_2", "tstamp")
+      .withColumnRenamed("_3", "pages")
+      .write
+      .format("com.databricks.spark.csv")
+      .option("delimiter", "\t")
+      .option("header", "true")
+      .mode(SaveMode.Overwrite)
+      .partitionBy("wiki_db")
+      .save(outputBasePath)
+  }
+
+  /**
+    * Config class for CLI argument parser using scopt
+    */
+  case class Params(
+                     outputBasePath: String = "/user/shiladsen/sessions",
+                     projectNamespaceTable: String = 
"wmf_raw.mediawiki_project_namespace_map",
+                     pageTable: String = "wmf_raw.mediawiki_page",
+                     redirectTable: String = "wmf_raw.mediawiki_redirect",
+                     webrequestTable: String = "wmf.webrequest",
+                     wikiList: Seq[String] = Seq("simplewiki"),
+                     outputFilesParts: Int = 1,
+                     snapshot: String = "", // Parameter required, never used 
as is
+                     begin: Timestamp = new Timestamp(0), // Parameter 
required, never used as is
+                     end: Timestamp = new Timestamp(0), // Parameter required, 
never used as is
+                     sessionTimeoutMillis: Int = 15*60*1000
+                   )
+
+  val YMD: DateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd")
+  val YMD_HMS: DateTimeFormatter = 
DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss")
+  val TS_FORMATS: String = "yyyy-MM-dd or yyyy-MM-ddTHH:mm:ss"
+
+  def parseTimestamp(s: String) : Option[Timestamp] = {
+    var ts = Try[java.sql.Timestamp](new Timestamp(YMD.parseMillis(s)))
+    if (ts.isFailure) {
+      ts = Try[java.sql.Timestamp](new Timestamp(YMD_HMS.parseMillis(s)))
+    }
+    if (ts.isFailure) None else Some(ts.get)
+  }
+
+  /**
+    * Define the command line options parser
+    */
+  val argsParser = new OptionParser[Params]("Clickstream dataset builder") {
+    head("Clickstream dataset builder", "")
+    note(
+      """
+        |This job computes a clickstream dataset from one or more wiki(s).
+        |It creates a date folder, and per-wiki folders to store the results.
+      """.stripMargin)
+    help("help") text ("Prints this usage text")
+
+    opt[String]('s', "snapshot") required() valueName ("<snapshot>") action { 
(x, p) =>
+      p.copy(snapshot = x)
+    } text ("The mediawiki hadoop snapshot to use for page, links and 
redirects (usually YYYY-MM)")
+
+    opt[String]('b', "begin") required() valueName ("<begin>") action { (x, p) 
=>
+      p.copy(begin = parseTimestamp(x).get)
+    } validate { x =>
+      if (parseTimestamp(x).isDefined) success
+      else failure(s"Invalid timestamp format. Must be one of {$TS_FORMATS}")
+    } text ("The beginning time for webrequest data gathering.")
+
+    opt[String]('e', "end") required() valueName ("<end>") action { (x, p) =>
+      p.copy(end = parseTimestamp(x).get)
+    } validate { x =>
+      if (parseTimestamp(x).isDefined) success
+      else failure(s"Invalid timestamp format. Must be one of {$TS_FORMATS}")
+    } text ("The ending time for webrequest data gathering.")
+
+    opt[Int]('t', "timeout") optional() valueName ("<timeout>") action { (x, 
p) =>
+      p.copy(sessionTimeoutMillis = x)
+    } validate { x => if (x >= 0) success else failure("Invalid timeout")
+    } text ("The timeout in milliseconds that distinguishes user sessions.")
+
+
+    opt[String]('o', "output-base-path") optional() valueName ("<path>") 
action { (x, p) =>
+      p.copy(outputBasePath = if (x.endsWith("/")) x.dropRight(1) else x)
+    } text ("Where on HDFS to store the computed dataset (date folder created 
for you). Defaults to hdfs://analytics-hadoop/wmf/data/archive/clickstream")
+
+    opt[String]('w', "wikis") optional() valueName 
"<wiki_db_1>,<wiki_db_2>..." action { (x, p) =>
+      p.copy(wikiList = x.split(",").map(_.toLowerCase))
+    } validate { x =>
+      val dbs = x.split(",").map(_.toLowerCase)
+      if (dbs.filter(db => db.isEmpty || (! db.contains("wik"))).length > 0)
+        failure("Invalid wikis list")
+      else
+        success
+    } text "wiki dbs to compute. Defaults to enwiki"
+
+    opt[Int]('p', "output-files-parts") optional() valueName ("<partitions>") 
action { (x, p) =>
+      p.copy(outputFilesParts = x)
+    } text ("Number of file parts to output in hdfs. Defaults to 1")
+
+    opt[String]("project-namespace-table") optional() valueName ("<table>") 
action { (x, p) =>
+      p.copy(projectNamespaceTable = x)
+    } text ("Fully qualified name of the project-namespace table on Hive. 
Default to wmf_raw.mediawiki_project_namespace_map")
+
+    opt[String]("page-table") optional() valueName ("<table>") action { (x, p) 
=>
+      p.copy(pageTable = x)
+    } text ("Fully qualified name of the page table on Hive. Default to 
wmf_raw.mediawiki_page")
+
+    opt[String]("redirect-table") optional() valueName ("<table>") action { 
(x, p) =>
+      p.copy(redirectTable = x)
+    } text ("Fully qualified name of the redirect table on Hive. Default to 
wmf_raw.mediawiki_redirect")
+
+    opt[String]("webrequest-table") optional() valueName ("<table>") action { 
(x, p) =>
+      p.copy(webrequestTable = x)
+    } text ("Fully qualified name of the webrequest table on Hive. Default to 
wmf.webrequest")
+
+  }
+
+
+  def main(args: Array[String]): Unit = {
+
+    val params = args.headOption match {
+      // Case when our job options are given as a single string.  Split them
+      // and pass them to argsParser.
+      case Some("--options") =>
+        argsParser.parse(args(1).split("\\s+"), 
Params()).getOrElse(sys.exit(1))
+      // Else the normal usage, each CLI opts can be parsed as a job option.
+      case _ =>
+        argsParser.parse(args, Params()).getOrElse(sys.exit(1))
+    }
+
+
+    val conf = new SparkConf()
+      .setAppName(s"SessionPagesBuilder")
+      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+      .set("spark.hadoop.mapred.output.compress", "true")
+      .set("spark.hadoop.mapred.output.compression.codec", "true")
+      .set("spark.hadoop.mapred.output.compression.codec", 
"org.apache.hadoop.io.compress.GzipCodec")
+      .set("spark.hadoop.mapred.output.compression.type", "BLOCK")
+
+    val sqlContext = new HiveContext(new SparkContext(conf))
+
+    // Exit non-zero if if any refinements failed.
+    apply(sqlContext, params)
+  }
+
+  def apply(sql: SQLContext, params: Params): Unit = {
+    sql.sparkContext
+      .getConf
+      .registerKryoClasses(Array(
+        classOf[PageInfo],
+        classOf[Redirect],
+        classOf[PageView]))
+
+    import sql.implicits._
+
+    val projectToWikiMap = prepareProjectToWikiMap(sql, 
params.projectNamespaceTable, params.snapshot, params.wikiList)
+    val domainList = projectToWikiMap.keys.toList
+    val projectList = domainList.map(_.stripSuffix(".org"))
+
+    val pages = preparePages(sql, params.pageTable, params.snapshot, 
params.wikiList).cache()
+    val redirects = prepareRedirects(sql, params.redirectTable, 
params.snapshot, params.wikiList, pages)
+    val pages2 = resolveRedirects(sql, pages, redirects)
+
+    val viewBeg = new Timestamp(new DateTime(params.begin.getTime)
+                                      .minusSeconds(LOOK_PAST_SECS)
+                                      .getMillis)
+    val viewEnd = new Timestamp(new DateTime(params.end.getTime)
+                                      .minusSeconds(LOOK_FUTURE_SECS)
+                                      .getMillis)
+    val views = prepareViews( sql, params.webrequestTable,
+                              viewBeg, viewEnd,
+                              projectList,
+                              projectToWikiMap,
+                              pages)
+
+    val beginMillis = params.begin.getTime
+    val sessions = createSessions(views, params.sessionTimeoutMillis)
+                        .filter(_.head.tstamp >= beginMillis)
+    writeSessions(sql, sessions, params.outputBasePath, 
params.outputFilesParts)
+  }
+}
diff --git 
a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestDataCreator.scala
 
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestDataCreator.scala
new file mode 100644
index 0000000..c1641b0
--- /dev/null
+++ 
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestDataCreator.scala
@@ -0,0 +1,216 @@
+package org.wikimedia.analytics.refinery.job.vectors
+
+import java.io.{FileWriter, PrintWriter}
+import java.sql.Timestamp
+
+import org.joda.time.DateTime
+import org.json.simple.JSONObject
+import 
org.wikimedia.analytics.refinery.job.vectors.SessionPagesBuilder.{PageInfo, 
PageView, Redirect, userHash}
+
+import scala.collection.{immutable, mutable}
+import scala.reflect.io.Path
+import scala.util.Random
+
+
+/**
+  * Creates fake data that can be processed by SessionPagesBuilder.
+  * Along with expected outcomes.
+  *
+  * @author Shilad Sen
+  */
+object TestDataCreator {
+
+  case class UserInfo(x: String, y: String, z: String, hash: Long)
+
+  import TestParams._
+
+  def randNamespace() : Int = {
+    (Random.nextInt(10) - 7).max(0)  // In [0,1,2] but 0 has probability 0.8
+  }
+
+  def choice[T](seq : Seq[T]) : T = {
+    seq(Random.nextInt(seq.length))
+  }
+
+  val usedUserHashes: mutable.Set[Long] = scala.collection.mutable.Set[Long]()
+
+  def randUser() : UserInfo = {
+    val randStr = () => { (1 to 10).map(_ => Random.nextPrintableChar() + 
"").mkString("") }
+    val u = (randStr(), randStr(), randStr())
+    val hash : Long = userHash(u._1, u._2, u._3)
+    if (usedUserHashes.contains(hash)) {
+      randUser()
+    } else {
+      usedUserHashes += hash
+      UserInfo(u._1, u._2, u._3, hash)
+    }
+  }
+
+  def generatePages(wikis: Seq[String], numPagesPerWiki: Int): Seq[PageInfo] = 
{
+    wikis
+      .flatMap(w => (1 to numPagesPerWiki).map((w, _))) // seq of (wikiId, 
pageId)
+      .map { case (w, p) =>
+        PageInfo(w, p, "Article " + p, randNamespace(), pageIsRedirect = 
false, -1)
+      }
+  }
+
+  def usedRedirects: mutable.Set[(String, String)] = 
scala.collection.mutable.Set[(String, String)]()
+
+  def generateRedirects(pages: Seq[PageInfo], redirectsPerWiki: Int) : 
Seq[Redirect]= {
+    val maxId = pages.map(_.pageId).max
+    val wikis = pages.map(_.wikiDb).distinct.toList
+    val byWiki = pages.groupBy(_.wikiDb)
+
+    byWiki.keys.flatMap { w =>
+      val shuffled = Random.shuffle(byWiki(w).toList).toArray
+      val srcs = shuffled.slice(0, redirectsPerWiki)
+      val dests = shuffled.slice(redirectsPerWiki, redirectsPerWiki * 2)
+      srcs.zip(dests).map { pair => Redirect(w, pair._1.pageId, 
pair._2.pageId) }
+    }.toSeq
+  }
+
+  def generateSessions(start: Long, end: Long, pages: Seq[PageInfo], 
numSessions: Int, meanLength: Int): Seq[(UserInfo, Seq[(Long, PageInfo)])] = {
+    val byWiki = pages.groupBy(_.wikiDb)
+    val wikis = byWiki.keys.toList
+    (1 to numSessions)
+      .map { _ =>
+        val n = 1 + Random.nextInt(meanLength * 2)
+        val w = choice(wikis)
+        val tstamps =  (1 to n)
+                          .foldLeft(List(start + 
Random.nextInt((end-start).asInstanceOf[Int])))(
+                            (seq, _) => (seq.head + 
Random.nextInt(SESSION_TIMEOUT_MILLIS-1))::seq)
+                          .reverse
+        val session = tstamps.map { (_, choice(byWiki(w))) }
+        (randUser(), session.toList)
+      }
+  }
+
+  def writeSessions(path: String,
+                    pages: Seq[PageInfo],
+                    redirects: Seq[Redirect],
+                    sessions: Seq[(UserInfo, Seq[(Long, PageInfo)])]) : Unit = 
{
+    val pageIdMap = pages
+                      .map(p => ((p.wikiDb, p.pageId), p.pageId.toString))
+                      .union(redirects.map(r => ((r.wikiDb, r.fromPageId), 
r.toPageId.toString)))
+                      .toMap
+
+    val out = new PrintWriter(new FileWriter(path))
+    sessions.foreach {
+      s =>
+        val ts = s._2.head._1.toString
+        val wiki = s._2.head._2.wikiDb
+        val pageIds = s._2.map(tsAndPage => pageIdMap((tsAndPage._2.wikiDb, 
tsAndPage._2.pageId))).mkString(" ")
+        out.write(ts + "\t" + wiki + "\t" + pageIds + "\n")
+    }
+    out.close()
+  }
+
+  import scala.collection.JavaConverters._
+
+
+  def writeWikis(path: String, pages: Seq[PageInfo]): Unit = {
+    val out = new PrintWriter(new FileWriter(path))
+    pages.map(_.wikiDb).distinct.foreach {
+      w =>
+        val js = Map(
+          "hostname" -> (w + ".org"),
+          "snapshot" -> SNAPSHOT,
+          "dbname" -> w
+        )
+        JSONObject.writeJSONString(js.asJava, out)
+        out.write("\n")
+    }
+    out.close()
+  }
+
+  def writePages(path: String, pages: Seq[PageInfo]): Unit = {
+    val out = new PrintWriter(new FileWriter(path))
+    pages.foreach { p =>
+      val js = Map(
+        "wiki_db" -> p.wikiDb,
+        "page_id" -> p.pageId,
+        "page_title" -> p.pageTitle,
+        "snapshot" -> SNAPSHOT,
+        "page_namespace" -> p.pageNamespace,
+        "page_is_redirect" -> p.pageIsRedirect
+      )
+      JSONObject.writeJSONString(js.asJava, out)
+      out.write("\n")
+    }
+    out.close()
+  }
+
+  def writeRedirects(path: String, pages: Seq[PageInfo], redirects: 
Seq[Redirect]): Unit = {
+    val titles = pages.map(p => ((p.wikiDb, p.pageId), p.pageTitle)).toMap
+
+    val out = new PrintWriter(new FileWriter(path))
+    redirects.foreach { r =>
+      val js = Map(
+        "wiki_db" -> r.wikiDb,
+        "rd_from" -> titles((r.wikiDb, r.fromPageId)),
+        "snapshot" -> SNAPSHOT,
+        "rd_title" -> titles((r.wikiDb, r.toPageId)),
+        "rd_namespace" -> 0
+      )
+      JSONObject.writeJSONString(js.asJava, out)
+      out.write("\n")
+    }
+    out.close()
+  }
+
+
+  def writeRequests(path: String, sessions: Seq[(UserInfo, Seq[(Long, 
PageInfo)])]): Unit = {
+    val allViews = sessions.flatMap { s =>
+      val userInfo = s._1
+      s._2.map( tsAndPage => (userInfo, tsAndPage._1, tsAndPage._2))
+    }
+
+    val out = new PrintWriter(new FileWriter(path))
+    Random.shuffle(allViews).foreach { v =>
+      val user = v._1
+      val dt = new DateTime(v._2)
+      val p = v._3
+      val pvi = Map(
+        "project" -> p.wikiDb,
+        "page_title" -> p.pageTitle
+      ).asJava
+
+      val js = Map(
+        "pageview_info" -> pvi,
+        "ts" -> new Timestamp(dt.getMillis).toString,
+        "user_agent" -> user.x,
+        "x_forwarded_for" -> user.y,
+        "client_ip" -> user.z,
+        "is_pageview" -> true,
+        "agent_type" -> "user",
+        "webrequest_source" -> "text",
+        "year" -> dt.getYear,
+        "month" -> dt.getMonthOfYear,
+        "day" -> dt.getDayOfMonth,
+        "hour" -> dt.getHourOfDay
+      )
+      JSONObject.writeJSONString(js.asJava, out)
+      out.write("\n")
+    }
+    out.close()
+  }
+
+  def main(args: Array[String]): Unit = {
+    val pages = generatePages(WIKIS, NUM_PAGES)
+    val redirects = generateRedirects(pages, NUM_REDIRECTS)
+
+    // Allow some sessions to start earlier than the requested range.
+    val minStart = START_TIME.getMillis - (END_TIME.getMillis - 
START_TIME.getMillis) / 4
+    val sessions = generateSessions(minStart, END_TIME.getMillis,
+                                    pages, NUM_SESSIONS, MEAN_SESSION_LENGTH)
+
+    Path(PATH_DATA).createDirectory(force = true, failIfExists = false)
+
+    writeWikis(PATH_DATA + "/wikis.json", pages)
+    writePages(PATH_DATA + "/pages.json", pages)
+    writeRedirects(PATH_DATA + "/redirects.json", pages, redirects)
+    writeRequests(PATH_DATA + "/views.json", sessions)
+    writeSessions(PATH_DATA + "/sessions.txt", pages, redirects,
+                  sessions.filter(_._2.head._1 >= START_TIME.getMillis))
+  }
+}
diff --git 
a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestParams.scala
 
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestParams.scala
new file mode 100644
index 0000000..85e5222
--- /dev/null
+++ 
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestParams.scala
@@ -0,0 +1,25 @@
+package org.wikimedia.analytics.refinery.job.vectors
+
+import org.joda.time.DateTime
+
+import scala.collection.immutable
+
+/**
+  * @author Shilad Sen
+  */
+object TestParams {
+  val SNAPSHOT: String = "2016-10"
+
+  val START_TIME = new DateTime(2017, 1, 1, 21, 0, 0)
+  val END_TIME = new DateTime(2017, 1, 2, 2, 0, 0)
+
+  val WIKIS: Seq[String] = (1 to 3).map{ "wiki_" + _ }
+  val NUM_PAGES = 1000        // per wiki
+  val NUM_REDIRECTS = 300     // per wiki
+  val NUM_SESSIONS = 1000
+
+  val MEAN_SESSION_LENGTH = 20
+  val SESSION_TIMEOUT_MILLIS: Int = 60 * 15 * 1000 // 15 min
+
+  val PATH_DATA = "./test-data/"
+}
diff --git 
a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestPartitionQueryBuilder.scala
 
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestPartitionQueryBuilder.scala
new file mode 100644
index 0000000..84c8bea
--- /dev/null
+++ 
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestPartitionQueryBuilder.scala
@@ -0,0 +1,77 @@
+package org.wikimedia.analytics.refinery.job.vectors
+
+import java.sql.Timestamp
+
+import org.scalatest.{FlatSpec, Matchers}
+import PartitionQueryBuilder.formSql
+
+class TestPartitionQueryBuilder extends FlatSpec with Matchers {
+
+  def sqlOr(conds : Seq[String]): String =  { conds.map("(" + _ + 
")").mkString(" or ")}
+  def normalize(s : String) : String = { s.toLowerCase.replaceAllLiterally(" 
", "")}
+
+  "A PartitionQueryBuilder " should " build partition queries " in {
+
+    val tests: Seq[(String, String, String)] = Seq(
+      ( // Simple example
+        "2017-01-04 03:10:00",
+        "2017-01-05 04:00:00",
+        sqlOr(List("year = 2017 and month = 1 and day = 4 and hour >= 3",
+                   "year = 2017 and month = 1 and day = 5 and hour <= 4"))
+      ),
+      ( // Edge case with empty interval
+        "2017-01-04 03:00:00",
+        "2017-01-04 03:00:00",
+        sqlOr(List("year = 2017 and month = 1 and day = 4 and hour = 3"))
+      ),
+      ( // Start and end in same day
+        "2017-01-04 03:10:00",
+        "2017-01-04 07:00:00",
+        sqlOr(List(
+          "year = 2017 and month = 1 and day = 4 and hour >= 3 and hour <= 7"))
+      ),
+      ( // Have a full day in the middle
+        "2017-01-04 03:10:00",
+        "2017-01-06 04:00:00",
+        sqlOr(List(
+          "year = 2017 and month = 1 and day = 4 and hour >= 3",
+          "year = 2017 and month = 1 and day = 5",
+          "year = 2017 and month = 1 and day = 6 and hour <= 4"))
+      ),
+      ( // Have a full month in the middle
+        "2017-01-30 03:10:00",
+        "2017-03-02 04:00:00",
+        sqlOr(List(
+          "year = 2017 and month = 1 and day = 30 and hour >= 3",
+          "year = 2017 and month = 1 and day = 31",
+          "year = 2017 and month = 2",
+          "year = 2017 and month = 3 and day = 1",
+          "year = 2017 and month = 3 and day = 2 and hour <= 4"))
+      ),
+      ( // Have a full year in the middle
+        "2014-11-29 03:10:00",
+        "2017-02-02 04:00:00",
+        sqlOr(List(
+          "year = 2014 and month = 11 and day = 29 and hour >= 3",
+          "year = 2014 and month = 11 and day = 30",
+          "year = 2014 and month = 12",
+          "year = 2015",
+          "year = 2016",
+          "year = 2017 and month = 1",
+          "year = 2017 and month = 2 and day = 1",
+          "year = 2017 and month = 2 and day = 2 and hour <= 4"))
+      )
+    )
+
+    tests.foreach( t => {
+      val beg = Timestamp.valueOf(t._1)
+      val end = Timestamp.valueOf(t._2)
+      val sql = t._3
+      val res = formSql(beg, end)
+      System.out.println(res)
+
+      normalize(sql) shouldEqual normalize(res)
+    })
+  }
+
+}
diff --git 
a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPagesBuilder.scala
 
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPagesBuilder.scala
new file mode 100644
index 0000000..749aab8
--- /dev/null
+++ 
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPagesBuilder.scala
@@ -0,0 +1,45 @@
+package org.wikimedia.analytics.refinery.job.vectors
+
+import java.sql.Timestamp
+
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.{SparkConf, SparkContext}
+
+/**
+  * @author Shilad Sen
+  */
+object TestSessionPagesBuilder {
+  import TestParams._
+
+  def main(args: Array[String]): Unit = {
+    val params = SessionPagesBuilder.Params(
+      outputBasePath = PATH_DATA + "/spark_sessions",
+      projectNamespaceTable = "wikis",
+      pageTable = "pages",
+      redirectTable = "redirects",
+      wikiList = WIKIS.toList,
+      webrequestTable = "requests",
+      snapshot = SNAPSHOT,
+      begin = new Timestamp(START_TIME.getMillis),
+      end = new Timestamp(END_TIME.getMillis)
+    )
+
+    val conf = new SparkConf()
+      .setMaster("local[*]")
+      .setAppName("test-session-pages-builder")
+    val sc = new SparkContext(conf)
+
+
+    val sql = new SQLContext(sc)
+    sql.read.json(PATH_DATA + "/views.json")
+      .registerTempTable(params.webrequestTable)
+    sql.read.json(PATH_DATA + "/redirects.json")
+      .registerTempTable(params.redirectTable)
+    sql.read.json(PATH_DATA + "/pages.json")
+      .registerTempTable(params.pageTable)
+    sql.read.json(PATH_DATA + "/wikis.json")
+      .registerTempTable(params.projectNamespaceTable)
+
+    SessionPagesBuilder(sql, params)
+  }
+}
\ No newline at end of file

-- 
To view, visit https://gerrit.wikimedia.org/r/381169
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I55395459d80d73f3d065967ce95d6506698d128e
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery/source
Gerrit-Branch: nav-vectors
Gerrit-Owner: Shilad Sen <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to