Shilad Sen has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/383761 )

Change subject: Spark job to create page ids viewed in each session
......................................................................

Spark job to create page ids viewed in each session

SessionPagesBuilder creates a table representing viewed page grouped by
browser session. The output is a table containing columns for wiki, date,
timestamp, and a space separated list of all the page ids viewed in the
session in order.

The job now runs on the cluster in a reasonable amount of time (10 min for
a day's worth of views).

SessionPruner filters the session table and removes any views of pages
below some threshold. As a side effect it creates a frequency table.

The testing harness creates fake test data and compares computed spark
results against computed in-memory results.

TODO:
* Oozify job (may require switching to Spark 2)

Bug: T174796
Change-Id: I55395459d80d73f3d065967ce95d6506698d128e

  Complete pass at session creation pipeline

    Added session pruner
    Switched to use tables instead of files
    Cleaned up tests

Change-Id: I19160e16d8140d03d81a4226e3974f42ec1e3602
---
A 
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/PartitionQueryBuilder.scala
A 
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
A 
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPruner.scala
A 
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/VectorUtils.scala
A 
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestDataCreator.scala
A 
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestPartitionQueryBuilder.scala
A 
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPagesBuilder.scala
A 
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPruner.scala
A 
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestUtils.scala
9 files changed, 1,425 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source 
refs/changes/61/383761/1

diff --git 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/PartitionQueryBuilder.scala
 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/PartitionQueryBuilder.scala
new file mode 100644
index 0000000..cd43cea
--- /dev/null
+++ 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/PartitionQueryBuilder.scala
@@ -0,0 +1,96 @@
+package org.wikimedia.analytics.refinery.job.vectors
+
+import java.sql.Timestamp
+
+import org.joda.time.{DateTime, DateTimeConstants, Hours, Interval}
+
+/**
+  * @author Shilad Sen
+  */
+object PartitionQueryBuilder {
+
+  /**
+    * Creations a SQL predicate that includes the time period from beginTs to 
endTs
+    *
+    * The SQL predicate is somewhat compressed, but there is still room for 
improvement.
+    *
+    * @return "(year = 2016) or (year = 2017 and month = 1 and day = 1 and 
hour < 3)"
+    */
+  def formSql(beginTs: Timestamp, endTs: Timestamp) : String = {
+    val begin = new DateTime(beginTs.getTime).hourOfDay().roundFloorCopy()
+    val end = new DateTime(endTs.getTime).hourOfDay().roundCeilingCopy()
+    if (begin == end) {
+        s"(year = ${begin.getYear} " +
+        s"and month = ${begin.getMonthOfYear} " +
+        s"and day = ${begin.getDayOfMonth} " +
+        s"and hour = ${begin.getHourOfDay})"
+    } else {
+      formSqlConditions(begin, end).map("(" + _ + ")").mkString(" OR ")
+    }
+  }
+
+  def formSqlConditions(begin: DateTime, end: DateTime) : Seq[String] = {
+    if (begin == end) {
+      return List()
+    }
+
+    // Try to take a year out of the middle
+    var startYear = begin.year().roundCeilingCopy()
+    var endYear = startYear.plusYears(1)
+    if (!startYear.isBefore(begin) && !endYear.isAfter(end)) {
+      return  formSqlConditions(begin, startYear) ++
+              List(s"year = ${startYear.getYear}") ++
+              formSqlConditions(endYear, end)
+    }
+
+    // Try to take a month out of the middle
+    var startMonth = begin.monthOfYear().roundCeilingCopy()
+    var endMonth = startMonth.plusMonths(1)
+    if (!startMonth.isBefore(begin) && !endMonth.isAfter(end)) {
+      return  formSqlConditions(begin, startMonth) ++
+        List(s"year = ${startMonth.getYear} " +
+             s"and month = ${startMonth.getMonthOfYear}") ++
+        formSqlConditions(endMonth, end)
+    }
+
+    // Try to take a day out of the middle
+    var startDay = begin.dayOfMonth().roundCeilingCopy()
+    var endDay = startDay.plusDays(1)
+    if (!startDay.isBefore(begin) && !endDay.isAfter(end)) {
+      return  formSqlConditions(begin, startDay) ++
+        List(s"year = ${startDay.getYear} " +
+             s"and month = ${startDay.getMonthOfYear} " +
+             s"and day = ${startDay.getDayOfMonth}") ++
+        formSqlConditions(endDay, end)
+    }
+
+    // Do we have a collection of hours that run up to the end of the starting 
day?
+    var startOfNextDay = begin.withTimeAtStartOfDay().plusDays(1)
+    if (!startOfNextDay.isAfter(end)) {
+     return  List(s"year = ${begin.getYear} " +
+                  s"and month = ${begin.getMonthOfYear} " +
+                  s"and day = ${begin.getDayOfMonth} " +
+                  s"and hour >= ${begin.getHourOfDay}") ++
+            formSqlConditions(startOfNextDay, end)
+    }
+
+    // Do we have a collection of hours that start at the beginning of the 
last day?
+    var startOfLastDay = end.withTimeAtStartOfDay()
+    if (!startOfLastDay.isBefore(begin)) {
+      return formSqlConditions(begin, startOfLastDay) ++
+             List(s"year = ${end.getYear} " +
+                  s"and month = ${end.getMonthOfYear} " +
+                  s"and day = ${end.getDayOfMonth} " +
+                  s"and hour <= ${end.getHourOfDay}")
+    }
+
+    // We must have an hour range within the same day.
+    assert(begin.withTimeAtStartOfDay() == end.withTimeAtStartOfDay())
+    List(s"year = ${begin.getYear} " +
+         s" and month = ${begin.getMonthOfYear}" +
+         s" and day = ${begin.getDayOfMonth}" +
+         s" and hour >= ${begin.getHourOfDay}" +
+         s" and hour <= ${end.getHourOfDay}")
+
+  }
+}
diff --git 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
new file mode 100644
index 0000000..fe0b839
--- /dev/null
+++ 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
@@ -0,0 +1,453 @@
+package org.wikimedia.analytics.refinery.job.vectors
+
+import java.sql.Timestamp
+
+import org.joda.time.Hours
+import org.joda.time.format.{DateTimeFormat, DateTimeFormatter}
+
+import scala.util.Try
+
+
+/**
+  * Process to create a page view session log using data in a webrequest table.
+  *
+  * This created table has one row per session.
+  * The first three columns are wiki, date, and timestamp.
+  * The last column contains the page ids viewed in the session, in order, 
joined by spaces.
+  *
+  * @author Shilad Sen
+  */
+object SessionPagesBuilder {
+
+  import org.apache.spark.sql.hive.HiveContext
+  import org.apache.spark.SparkConf
+  import org.apache.spark.sql.SaveMode
+  import org.apache.spark.SparkContext
+  import scopt.OptionParser
+  import org.apache.spark.rdd.RDD
+  import org.apache.spark.sql.SQLContext
+  import org.joda.time.DateTime
+  import org.apache.log4j.{Level, LogManager}
+  import org.apache.spark.Partitioner
+  import java.sql.Timestamp
+
+  import scala.collection.mutable.ArrayBuffer
+  import scala.util.hashing.MurmurHash3
+
+  import VectorUtils.{toTimestamp, toInt, parseTimestamp, TS_FORMATS}
+
+  /**
+    * Number of seconds of later data to gather beyond the
+    * time interval that is requested in the params to try to
+    * look for the end of sessions that spill over the interval.
+    * These will be included
+    */
+  val LOOK_FUTURE_SECS: Int = 60 * 60 * 3
+
+  /**
+    * How far in the past to look for sessions with start times BEFORE
+    * the requested interval that spill into the requested interval.
+    * These will be excluded from the output.
+    */
+  val LOOK_PAST_SECS: Int = 60 * 59
+
+  /**
+    * A user session is defined by: A wiki, a hash of unique user information,
+    * and the timestamp for a view. This is used to stream through pageviews
+    * that may have been part of the same user session.
+    */
+  case class SessionKey (wikiDb: String, userHash: Long, tstamp: Long)
+    extends Ordered[SessionKey] {
+    override def compare(that: SessionKey): Int = {
+      import scala.math.Ordered.orderingToOrdered
+
+      Ordering.Tuple3(Ordering.String, Ordering.Long, Ordering.Long).compare(
+        (wikiDb, userHash, tstamp),
+        (that.wikiDb, that.userHash, that.tstamp)
+      )
+    }
+  }
+
+  /**
+    * Partitions all views for the same wiki from the same user onto a single 
partition.
+    */
+  class SessionPartitioner(partitions: Int) extends Partitioner {
+    require(partitions >= 0, s"Number of partitions ($partitions) cannot be 
negative.")
+
+    override def numPartitions: Int = partitions
+
+    override def getPartition(key: Any): Int = {
+      val k = key.asInstanceOf[SessionKey]
+      Math.abs(k.wikiDb.hashCode() + k.userHash.asInstanceOf[Int]) % 
numPartitions
+    }
+  }
+
+  object SessionKey {
+    implicit def orderingByTimestamp[A <: SessionKey] : Ordering[A] = {
+      Ordering.by(fk => (fk.wikiDb, fk.userHash, fk.tstamp))
+    }
+  }
+
+
+  /**
+    * A view of a page by a user.
+    * tstamp is milliseconds since the epoch.
+    * userHash is a 64-bit hash of user ip and other unique information.
+    */
+  case class PageView(
+                       wikiDb: String,
+                       userHash: Long,
+                       tstamp: Long,
+                       pageNamespace: Long,
+                       pageId: Long
+                     )
+
+  /**
+    * Returns true iff the two view are part of the same session.
+    * User hashes must match and the pageviews must fall within a specified 
window.
+    */
+  def sameSession(v1: PageView, v2: PageView, timeoutSecs: Long) : Boolean = {
+    (v1.wikiDb == v2.wikiDb) &&
+      (v1.userHash == v2.userHash) &&
+      (Math.abs(v1.tstamp - v2.tstamp) <= timeoutSecs * 1000)
+  }
+
+
+  /**
+    * Given components associated with a distinct user , return a long hash
+    * @param client_ip
+    * @param user_agent
+    * @param x_forwarded_for
+    * @return
+    */
+  def userHash(
+               client_ip: String,
+               user_agent: String,
+               x_forwarded_for: String
+             ) : Long = {
+    var s = client_ip + ":" + user_agent + ":" +  x_forwarded_for
+    (MurmurHash3.stringHash(s).asInstanceOf[Long] << 32) |
+      (MurmurHash3.stringHash(s.reverse).asInstanceOf[Long] & 0xFFFFFFFL)
+  }
+
+  def listToSQLInCondition(list: Seq[String]): String = list.map(w => 
s"'$w'").mkString(", ")
+
+  /**
+    * Prepare a map linking project hostnames to project dbnames.
+    *
+    * @return A map[hostname -> dbname]
+    */
+  def prepareProjectToWikiMap(
+                               sqlContext: SQLContext,
+                               projectNamespaceTable: String,
+                               snapshot: String,
+                               wikiList: Seq[String]
+                             ): Map[String, String] = {
+    sqlContext.sql(
+      s"""
+         |SELECT DISTINCT
+         |  hostname,
+         |  dbname
+         |FROM $projectNamespaceTable
+         |WHERE snapshot = '$snapshot'
+         |  AND dbname IN (${listToSQLInCondition(wikiList)})
+        """.stripMargin).
+      rdd.
+      collect.
+      map(r => r.getString(0) -> r.getString(1)).toMap
+  }
+
+
+  /**
+    * Prepare the view data
+    * @return A RDD of page views
+    */
+  def prepareViews(
+                          sqlContext: SQLContext,
+                          webrequestTable: String,
+                          begin: Timestamp,
+                          end: Timestamp,
+                          projectList: Seq[String],
+                          projectToWikiMap: Map[String, String]
+                        ): RDD[PageView] = {
+
+    sqlContext.sql(
+      s"""
+         |SELECT
+         |  CONCAT(pageview_info['project'], '.org') AS project,
+         |  namespace_id,
+         |  page_id,
+         |  ts,
+         |  client_ip,
+         |  user_agent,
+         |  x_forwarded_for
+         |FROM $webrequestTable
+         |WHERE webrequest_source = 'text'
+         |  AND access_method = 'desktop'
+         |  AND normalized_host.project_class IN ('wikipedia', 'wikimedia')
+         |  AND(${PartitionQueryBuilder.formSql(begin, end)})
+         |  AND pageview_info['project'] IN 
(${listToSQLInCondition(projectList)})
+         |  AND is_pageview
+         |  AND namespace_id IS NOT NULL
+         |  AND page_id IS NOT NULL
+         |  AND agent_type = 'user'
+          """.stripMargin)
+      .rdd
+      .map(r => {
+        val wiki = projectToWikiMap(r.getString(0))
+        val uh = userHash(r.getString(4), r.getString(5), r.getString(6))
+        PageView(wiki, uh, toTimestamp(r.get(3)).getTime, toInt(r.get(1)), 
toInt(r.get(2)))
+      })
+  }
+
+  /**
+    * Aggregates page views by user / wiki and groups them into sessions.
+    *G
+    * 1. Create session key composed of wiki, userhash, tstamp that delineates 
sessions.
+    * 2. Partition by wiki, userhash with secondary sort on tstamp.
+    * 3. Reduce page views into sessions on each partitition. The iterator
+    *    is necessary for the final step because there could be millions / 
billions
+    *    of page views on a partition so we cannot load the result set in 
memory.
+    *
+    * @return
+    */
+  def createSessions(views: RDD[PageView], timeoutInSecs: Int): 
RDD[Seq[PageView]] = {
+    views
+      .map { v => (SessionKey(v.wikiDb, v.userHash, v.tstamp), v) }
+      .repartitionAndSortWithinPartitions(new 
SessionPartitioner(views.getNumPartitions))
+      .mapPartitions {
+        iter =>
+          new Iterator[Seq[PageView]] {
+            var eos: Boolean = !iter.hasNext
+            var session = new ArrayBuffer[PageView]()
+            var nextSessionHead : Option[PageView] = None
+
+            override def hasNext: Boolean = {
+              tryToFillSession()
+              session.nonEmpty
+            }
+
+            override def next: Seq[PageView] = {
+              // If hasNext returns true we should always have a session
+              tryToFillSession()
+              assert(session.nonEmpty)
+
+              // Create a new empty session and place the next session head 
into it
+              var s = session
+              session = new ArrayBuffer[PageView]()
+              if (nextSessionHead.isDefined) {
+                session += nextSessionHead.get
+                nextSessionHead = None
+              }
+              s
+            }
+
+            /**
+              * Recursively adds the next page view onto the current session 
until
+              * we are certain it belongs to the next session. The sign this 
has occurred
+              * is nextSessionHead is non-empty
+              */
+            def tryToFillSession(): Unit = {
+              if (!eos && nextSessionHead.isEmpty && iter.hasNext) {
+                var p = iter.next._2
+                if (session.isEmpty || sameSession(p, session.last, 
timeoutInSecs)) {
+                  session += p
+                  tryToFillSession()
+                } else {
+                  nextSessionHead = Some(p)
+                }
+              }
+            }
+          }
+      }
+  }
+
+
+  /**
+    * Write sessions to output files.
+    * TODO: Overwrite existing files.
+    */
+  def writeSessions(sqlContext: SQLContext, sessions: RDD[Seq[PageView]], 
outputTable: String): Unit = {
+    import sqlContext.implicits._
+
+    val df =
+      sessions
+        .map { views =>
+          // Make concatenated ids with namespace 0
+          val ids = views
+            .filter(_.pageNamespace == 0)
+            .map(_.pageId.toString)
+            .mkString(" ")
+
+          // YYYY-MM-DDTHH:MM:SS
+          val ts = new Timestamp(views.head.tstamp).toString
+          val date = ts.substring(0, 10)
+
+          // Timestamp for session is timestamp for first view, regardless of
+          // whether it is ultimately filtered out
+          (views.head.wikiDb, date, ts, ids)
+        }
+        .filter(_._4.nonEmpty) // Remove sessions with zero pageviews
+        .toDF("wiki_db", "date", "tstamp", "pages")
+
+    if (sqlContext.isInstanceOf[HiveContext]) {
+      df.write
+        .format("parquet")
+        .mode(SaveMode.Overwrite)
+        .partitionBy("wiki_db", "date")
+        .saveAsTable(outputTable)
+    } else {
+      // We are in unit testing mode so we can't write a real table
+      df.registerTempTable(outputTable)
+    }
+  }
+
+  /**
+    * Config class for CLI argument parser using scopt
+    */
+  case class Params(
+   outputTable: String = "shilad.sessions",
+   projectNamespaceTable: String = "wmf_raw.mediawiki_project_namespace_map",
+   webrequestTable: String = "wmf.webrequest",
+   wikiList: Seq[String] = Seq("simplewiki"),
+   outputFilesParts: Int = 1,
+   snapshot: String = "", // Parameter required, never used as is
+   begin: Timestamp = new Timestamp(0), // Parameter required, never used as is
+   end: Timestamp = new Timestamp(0), // Parameter required, never used as is
+   sessionTimeoutSecs: Int = 15*60
+ )
+
+
+  /**
+    * Define the command line options parser
+    */
+  val argsParser = new OptionParser[Params]("Pageview session builder") {
+    head("Pageview session builder", "")
+    note(
+      """
+        |This job computes a clickstream dataset from one or more wiki(s).
+        |It creates a date folder, and per-wiki folders to store the results.
+      """.stripMargin)
+    help("help") text ("Prints this usage text")
+
+    opt[String]('s', "snapshot") required() valueName ("<snapshot>") action { 
(x, p) =>
+      p.copy(snapshot = x)
+    } text ("The mediawiki hadoop snapshot to use for page, links and 
redirects (usually YYYY-MM)")
+
+    opt[String]('b', "begin") required() valueName ("<begin>") action { (x, p) 
=>
+      p.copy(begin = parseTimestamp(x).get)
+    } validate { x =>
+      if (parseTimestamp(x).isDefined) success
+      else failure(s"Invalid timestamp format. Must be one of {$TS_FORMATS}")
+    } text ("The beginning time for webrequest data gathering.")
+
+    opt[String]('e', "end") required() valueName ("<end>") action { (x, p) =>
+      p.copy(end = parseTimestamp(x).get)
+    } validate { x =>
+      if (parseTimestamp(x).isDefined) success
+      else failure(s"Invalid timestamp format. Must be one of {$TS_FORMATS}")
+    } text ("The ending time for webrequest data gathering.")
+
+    opt[Int]('t', "timeout") optional() valueName ("<timeout>") action { (x, 
p) =>
+      p.copy(sessionTimeoutSecs = x)
+    } validate { x => if (x >= 0) success else failure("Invalid timeout")
+    } text ("The timeout in seconds that delineates user sessions.")
+
+
+    opt[String]('o', "output-table") optional() valueName ("<table>") action { 
(x, p) =>
+      p.copy(outputTable = x)
+    } text ("Hive table to store the computed user sessions")
+
+    opt[String]('w', "wikis") optional() valueName 
"<wiki_db_1>,<wiki_db_2>..." action { (x, p) =>
+      p.copy(wikiList = x.split(",").map(_.toLowerCase))
+    } validate { x =>
+      val dbs = x.split(",").map(_.toLowerCase)
+      if (dbs.filter(db => db.isEmpty || (! db.contains("wik"))).length > 0)
+        failure("Invalid wikis list")
+      else
+        success
+    } text "wiki dbs to compute. Defaults to enwiki"
+
+    opt[Int]('p', "output-files-parts") optional() valueName ("<partitions>") 
action { (x, p) =>
+      p.copy(outputFilesParts = x)
+    } text ("Number of file parts to output in hdfs. Defaults to 1")
+
+    opt[String]("project-namespace-table") optional() valueName ("<table>") 
action { (x, p) =>
+      p.copy(projectNamespaceTable = x)
+    } text ("Fully qualified name of the project-namespace table on Hive. 
Default to wmf_raw.mediawiki_project_namespace_map")
+
+    opt[String]("webrequest-table") optional() valueName ("<table>") action { 
(x, p) =>
+      p.copy(webrequestTable = x)
+    } text ("Fully qualified name of the webrequest table on Hive. Default to 
wmf.webrequest")
+
+  }
+
+
+  def main(args: Array[String]): Unit = {
+
+    val params = args.headOption match {
+      // Case when our job options are given as a single string.  Split them
+      // and pass them to argsParser.
+      case Some("--options") =>
+        argsParser.parse(args(1).split("\\s+"), 
Params()).getOrElse(sys.exit(1))
+      // Else the normal usage, each CLI opts can be parsed as a job option.
+      case _ =>
+        argsParser.parse(args, Params()).getOrElse(sys.exit(1))
+    }
+
+
+    val conf = new SparkConf()
+      .setAppName(s"SessionPagesBuilder")
+      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+      .set("spark.hadoop.mapred.output.compress", "true")
+      .set("spark.hadoop.mapred.output.compression.codec", "true")
+      .set("spark.hadoop.mapred.output.compression.codec", 
"org.apache.hadoop.io.compress.GzipCodec")
+      .set("spark.hadoop.mapred.output.compression.type", "BLOCK")
+
+    val sqlContext = new HiveContext(new SparkContext(conf))
+
+    // Exit non-zero if if any refinements failed.
+    apply(sqlContext, params)
+  }
+
+  def apply(sql: SQLContext, params: Params): Unit = {
+    sql.sparkContext
+      .getConf
+      .registerKryoClasses(Array(
+        classOf[PageView]))
+
+    import sql.implicits._
+
+    val projectToWikiMap = prepareProjectToWikiMap(
+      sql,
+      params.projectNamespaceTable,
+      params.snapshot,
+      params.wikiList)
+    val domainList = projectToWikiMap.keys.toList
+    val projectList = domainList.map(_.stripSuffix(".org"))
+
+    // Look into the past to make sure we don't jump in during the middle
+    // of a session that has a start time before the requested begin
+    val viewBeg = new Timestamp(new DateTime(params.begin.getTime)
+                                      .minusSeconds(2 * 
params.sessionTimeoutSecs)
+                                      .getMillis)
+
+    // Look into the future to wrap up any sessions that started in the time
+    // interval but spill over.
+    val viewEnd = new Timestamp(new DateTime(params.end.getTime)
+                                      .plusSeconds(LOOK_FUTURE_SECS)
+                                      .getMillis)
+
+    val views = prepareViews( sql, params.webrequestTable,
+                              viewBeg, viewEnd,
+                              projectList, projectToWikiMap)
+
+    val sessions = createSessions(views, params.sessionTimeoutSecs)
+      .filter { ts =>
+        ts.head.tstamp >= params.begin.getTime &&
+        ts.head.tstamp <= params.end.getTime
+      }
+
+    writeSessions(sql, sessions, params.outputTable)
+  }
+}
diff --git 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPruner.scala
 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPruner.scala
new file mode 100644
index 0000000..1b270fb
--- /dev/null
+++ 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPruner.scala
@@ -0,0 +1,243 @@
+package org.wikimedia.analytics.refinery.job.vectors
+
+import java.sql.Timestamp
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
+import org.apache.spark.{SparkConf, SparkContext}
+import org.joda.time.format.{DateTimeFormat, DateTimeFormatter}
+import scopt.OptionParser
+
+import scala.util.Try
+
+import VectorUtils.{toTimestamp, toInt, parseTimestamp, TS_FORMATS}
+
+/**
+  * @author Shilad Sen
+  */
+object SessionPruner {
+
+  /**
+    * A session of pageviews.
+    */
+  case class Session(wiki: String, begin: Long, views: Seq[Long])
+
+  def listToSQLInCondition(list: Seq[String]): String = list.map(w => 
s"'$w'").mkString(", ")
+
+  /**
+    * Calculate the frequency table for pages within a session data frame
+    */
+  def calculateFreq(df: DataFrame) : RDD[(String, String,Int)] = {
+    df
+      .select("wiki_db", "pages")
+      .rdd
+      .flatMap {
+        case Row(w: String, pages: String) =>
+          pages.split(" ").map((w, _))     // (wiki, page_id)
+      }
+      .map((_, 1))                              // ((wiki, page_id), 1)
+      .reduceByKey((x : Int, y: Int) => x + y)  // ((wiki, page_id), freq)
+      .map { case ((w, p), f) => (w, p, f) }    // (wiki, page_id, freq)
+  }
+
+  /**
+    * Prunes sessions to only those with views of pages in keepers.
+    *
+    * @param sessions Data frame for unpruned sessions table
+    * @param keepers (wiki, pageid) pairs that should be retained post-pruning.
+    */
+  def prune(
+      sessions: DataFrame,
+      keepers: RDD[(String, String)])
+    : RDD[(String, String, Timestamp, String)] = {
+
+    // Create RDD of ((wiki, page), (sessionId, i, date, tstamp))
+    // sessionId is a unique id and i is the index of the page
+    // view within the session.  This will be joined with the keepers RDD
+    val pageViews =
+      sessions
+        .select("wiki_db", "date", "tstamp", "pages")
+        .rdd
+        .zipWithUniqueId()
+        .flatMap { case (row, sessionId) =>
+          val w = row.getString(0)
+          val d = row.getString(1)
+          val ts = toTimestamp(row.get(2))
+          val pages = row.getString(3).split(" ")
+
+          pages
+            .zipWithIndex
+            .map { case (p, i) => ((w, p), (sessionId, i, d, ts)) }
+        }
+
+    // Add a garbage column to the keepers to make it joinable
+    val keepersToJoin = keepers.map((_, 1))
+
+    // Join the two datasets to filter out unused pageviews
+    // Recreate the original session data, and return it
+    pageViews
+      .join(keepersToJoin)
+      .map { case ((wiki, page), ((sessionId, i, date, tstamp), garbage)) =>
+        (sessionId, (i, date, tstamp, wiki, page))
+      }
+      .groupByKey()
+      .map { case (sessionId, views) =>
+        val date = views.head._2
+        val tstamp = views.head._3
+        val wiki = views.head._4
+        val pages = views.toList.sortBy{_._1}.map(_._5) // sort by i, extract 
page
+
+        (wiki, date, tstamp, pages.mkString(" "))
+      }
+  }
+
+
+  /**
+    * Config class for CLI argument parser using scopt
+    */
+  case class Params(
+     inputTable: String = "sessions",
+     outputTable: String = "sessions_pruned",
+     freqTable: Option[String] = None,
+     wikiList: Seq[String] = Seq("simplewiki"),
+     begin: Timestamp = new Timestamp(0), // Parameter required, never used as 
is
+     end: Timestamp = new Timestamp(0), // Parameter required, never used as is
+     minFreq: Int = 50
+   )
+
+
+  /**
+    * Define the command line options parser
+    */
+  val argsParser = new OptionParser[Params]("Session pruner") {
+    head("Session pruner", "")
+    note(
+      """
+        |This job prunes a session pageview dataset for one or more wiki(s).
+        |Only those pages above some frequency threshold are retained.
+        |It creates a date folder, and per-wiki folders to store the results.
+      """.stripMargin)
+    help("help") text ("Prints this usage text")
+
+    opt[String]('b', "begin") required() valueName ("<begin>") action { (x, p) 
=>
+      p.copy(begin = parseTimestamp(x).get)
+    } validate { x =>
+      if (parseTimestamp(x).isDefined) success
+      else failure(s"Invalid timestamp format. Must be one of {$TS_FORMATS}")
+    } text ("The beginning time for webrequest data gathering.")
+
+    opt[String]('e', "end") required() valueName ("<end>") action { (x, p) =>
+      p.copy(end = parseTimestamp(x).get)
+    } validate { x =>
+      if (parseTimestamp(x).isDefined) success
+      else failure(s"Invalid timestamp format. Must be one of {$TS_FORMATS}")
+    } text ("The ending time for webrequest data gathering.")
+
+    opt[String]('f', "freq-table") optional() valueName ("<table>") action { 
(x, p) =>
+      p.copy(freqTable = Some(if (x.endsWith("/")) x.dropRight(1) else x))
+    } text ("SQL table name for output frequency file.")
+
+    opt[String]('i', "input-table") optional() valueName ("<table>") action { 
(x, p) =>
+      p.copy(inputTable = x)
+    } text ("Sql table name for input sessions.")
+
+    opt[String]('o', "outputTable") optional() valueName ("<table>") action { 
(x, p) =>
+      p.copy(outputTable = x)
+    } text ("SQL table name for pruned sessions.")
+
+    opt[String]('w', "wikis") optional() valueName 
"<wiki_db_1>,<wiki_db_2>..." action { (x, p) =>
+      p.copy(wikiList = x.split(",").map(_.toLowerCase))
+    } validate { x =>
+      val dbs = x.split(",").map(_.toLowerCase)
+      if (dbs.filter(db => db.isEmpty || (! db.contains("wik"))).length > 0)
+        failure("Invalid wikis list")
+      else
+        success
+    } text "wiki dbs to compute. Defaults to enwiki"
+
+  }
+
+  def main(args: Array[String]): Unit = {
+
+    val params = args.headOption match {
+      // Case when our job options are given as a single string.  Split them
+      // and pass them to argsParser.
+      case Some("--options") =>
+        argsParser.parse(args(1).split("\\s+"), 
Params()).getOrElse(sys.exit(1))
+      // Else the normal usage, each CLI opts can be parsed as a job option.
+      case _ =>
+        argsParser.parse(args, Params()).getOrElse(sys.exit(1))
+    }
+
+    val conf = new SparkConf()
+      .setAppName(s"SessionPagesBuilder")
+      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+      .set("spark.hadoop.mapred.output.compress", "true")
+      .set("spark.hadoop.mapred.output.compression.codec", "true")
+      .set("spark.hadoop.mapred.output.compression.codec", 
"org.apache.hadoop.io.compress.GzipCodec")
+      .set("spark.hadoop.mapred.output.compression.type", "BLOCK")
+
+    val sqlContext = new HiveContext(new SparkContext(conf))
+
+    // Exit non-zero if if any refinements failed.
+    apply(sqlContext, params)
+  }
+
+  def apply(sql: SQLContext, params: Params): Unit = {
+    sql.sparkContext
+      .getConf
+      .registerKryoClasses(Array(classOf[Session]))
+
+    val begDate = params.begin.toString.substring(0, 10) // YYYY-mm-dd
+    val endDate = params.end.toString.substring(0, 10) // YYYY-mm-dd
+
+    val sessions =
+      sql.sql(
+      s"""
+         |SELECT wiki_db, date, tstamp, pages
+         |FROM ${params.inputTable}
+         |WHERE date >= '$begDate'
+         |  AND date <= '$endDate'
+         |  AND wiki_db IN (${listToSQLInCondition(params.wikiList)})
+         |  AND tstamp >= '${params.begin}'
+         |  AND tstamp <= '${params.end}'
+          """.stripMargin)
+
+    import sql.implicits._
+
+    val freq = calculateFreq(sessions)
+
+    if (params.freqTable.isDefined) {
+      val df = freq.toDF("wiki_db", "page", "frequency")
+      if (sql.isInstanceOf[HiveContext]) {
+        df.write
+          .format("parquet")
+          .mode(SaveMode.Overwrite)
+          .partitionBy("wiki_db")
+          .saveAsTable(params.freqTable.get)
+      } else {
+        // We are in unit testing mode so we can't write a real table
+        df.registerTempTable(params.freqTable.get)
+      }
+    }
+
+    val keepers = freq
+        .filter(_._3 >= params.minFreq)
+        .map{ triple => (triple._1, triple._2) }  // (wiki, page)
+
+    val prunedDf = prune(sessions, keepers)
+      .toDF("wiki_db", "date", "tstamp", "pages")
+
+    if (sql.isInstanceOf[HiveContext]) {
+      prunedDf.write
+        .format("parquet")
+        .mode(SaveMode.Overwrite)
+        .partitionBy("wiki_db", "date")
+        .saveAsTable(params.outputTable)
+    } else {
+      // We are in unit testing mode so we can't write a real table
+      prunedDf.registerTempTable(params.outputTable)
+    }
+  }
+}
diff --git 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/VectorUtils.scala
 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/VectorUtils.scala
new file mode 100644
index 0000000..0ab8bbb
--- /dev/null
+++ 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/VectorUtils.scala
@@ -0,0 +1,53 @@
+package org.wikimedia.analytics.refinery.job.vectors
+
+import java.sql.Timestamp
+
+import org.joda.time.format.{DateTimeFormat, DateTimeFormatter}
+
+import scala.util.Try
+
+/**
+  * @author Shilad Sen
+  */
+object VectorUtils {
+
+
+  /**
+    * Wrapper that allows us to handle production hive timestamps (which
+    * are already timestmaps), and testing data from JSON files (which are
+    * strings) uniformly.
+    */
+  def toTimestamp(ts: Any) : Timestamp = {
+    ts match {
+      case _: Timestamp =>
+        ts.asInstanceOf[Timestamp]
+      case _: String =>
+        Timestamp.valueOf(ts.asInstanceOf[String])
+      case _ => throw new IllegalArgumentException(ts.toString)
+    }
+  }
+
+  /**
+    * Wrapper that allows us to handle production hive data and JSON testing
+    * data uniformly
+    */
+  def toInt(x : Any) : Int = {
+    x match {
+      case y: Long => y.toInt
+      case _ => x.asInstanceOf[Int]
+    }
+  }
+
+
+  val YMD: DateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd")
+  val YMD_HMS: DateTimeFormatter = 
DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss")
+  val TS_FORMATS: String = "yyyy-MM-dd or yyyy-MM-ddTHH:mm:ss"
+
+  def parseTimestamp(s: String) : Option[Timestamp] = {
+    var ts = Try[java.sql.Timestamp](new Timestamp(YMD.parseMillis(s)))
+    if (ts.isFailure) {
+      ts = Try[java.sql.Timestamp](new Timestamp(YMD_HMS.parseMillis(s)))
+    }
+    if (ts.isFailure) None else Some(ts.get)
+  }
+}
diff --git 
a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestDataCreator.scala
 
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestDataCreator.scala
new file mode 100644
index 0000000..c9bf31e
--- /dev/null
+++ 
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestDataCreator.scala
@@ -0,0 +1,279 @@
+package org.wikimedia.analytics.refinery.job.vectors
+
+import java.io.{File, FileWriter, PrintWriter}
+import java.sql.Timestamp
+
+import org.joda.time.DateTime
+import org.json.simple.JSONObject
+import 
org.wikimedia.analytics.refinery.job.vectors.SessionPagesBuilder.{PageView, 
userHash}
+
+import scala.collection.mutable
+import scala.reflect.io.Path
+import scala.util.Random
+
+
+/**
+  * Creates fake data that can be processed by SessionPagesBuilder along with
+  * expected generated files. Files created are:
+  *
+  * 1) wikis.json: Input table mapping between domain names and wikis
+  * 2) requests.json: Input table containing page views.
+  * 3) sessions.txt: Output table created by SessionPagesBuilder
+  * 4) frequency.txt: Number of views per page in sessions.txt
+  * 5) sessions_pruned.txt: Sessions with pages that are rarely viewed removed.
+  *
+  * @author Shilad Sen
+  */
+object TestDataCreator {
+  val RAND = new Random(1)
+
+  case class UserInfo(x: String, y: String, z: String, hash: Long)
+  case class Session(user: UserInfo, tstamp: Long, views: Seq[PageView])
+
+
+  case class Params(
+     snapshot: String = "2016-10",
+     startTime: DateTime = new DateTime(2017, 1, 1, 21, 0, 0),
+     endTime: DateTime = new DateTime(2017, 1, 2, 2, 0, 0),
+     wikis: Seq[String] = (1 to 3).map{ "wiki_" + _ },
+     numPages: Int = 100,
+     numSessions: Int = 100,
+     meanSessionLength : Int = 10,
+     sessionTimeoutSecs : Int = 60 * 15,
+     outputDir : File = new File("path-data"),
+     minFreq: Int = 5
+  )
+
+
+  /**
+    * Chooses a random element in a sequences.
+    */
+  def choice[T](seq : Seq[T]) : T = {
+    seq(RAND.nextInt(seq.length))
+  }
+
+  val usedUserHashes: mutable.Set[Long] = mutable.Set[Long]()
+
+  def randUser() : UserInfo = {
+    val randStr = () => { (1 to 10).map(_ => RAND.nextPrintableChar() + 
"").mkString("") }
+    val u = (randStr(), randStr(), randStr())
+    val hash : Long = userHash(u._1, u._2, u._3)
+    if (usedUserHashes.contains(hash)) {
+      randUser()
+    } else {
+      usedUserHashes += hash
+      UserInfo(u._1, u._2, u._3, hash)
+    }
+  }
+
+  def randPageView(user : UserInfo, wiki: String, tstamp: Long, numArticles : 
Int) : PageView = {
+    val pageId = RAND.nextInt(numArticles)
+    // pseudo-random namespace. it is deterministic given page id
+    val pageNs = (pageId.hashCode() % 10 - 8).max(0)
+    PageView(wiki, user.hash, tstamp, pageNs, pageId)
+  }
+
+  /**
+    * Split a session into two sessions by the same user if possible. This is 
down
+    * by increment pageviews beyond some point in the session by a random 
offset that
+    * is bigger than the session timeout.
+    */
+  def splitSessions(session : Session, sessionTimeoutSecs: Int, start: Long, 
end: Long) : List[Session] = {
+    val views = session.views
+
+    val beg2 = views.last.tstamp + sessionTimeoutSecs * 1000 + 1 // Earliest 
start of a new session
+    if (views.length > 2 && beg2 < end) {
+      val offset = RAND.nextInt((end - beg2).asInstanceOf[Int]) + 
sessionTimeoutSecs * 1000 + 1
+      val i = 1 + RAND.nextInt(views.length - 1)
+      val views1 = views.slice(0, i)
+      val views2 = views
+        .slice(i, views.length)
+        .map { v =>
+          PageView(v.wikiDb, v.userHash, v.tstamp + offset, v.pageNamespace, 
v.pageId)
+        }
+      List(
+        Session(session.user, views1.head.tstamp, views1.toList),
+        Session(session.user, views2.head.tstamp, views2.toList)
+      )
+    } else {
+      List(session) // Not possible to split.
+    }
+  }
+
+  /**
+    * Generates random sessions with starting timestamps in the specified 
range.
+    */
+  def generateSessions(params: Params): Seq[Session] = {
+    val start = params.startTime.getMillis
+    val end = params.endTime.getMillis
+    (1 to params.numSessions)
+      .flatMap { _ =>
+        val n = 1 + RAND.nextInt(params.meanSessionLength * 2)
+        val w = choice(params.wikis)
+        val u = randUser()
+        val startTstamp = start + RAND.nextInt((end-start).asInstanceOf[Int])
+        val tstamps =
+          (1 to n)
+            .foldLeft(List(startTstamp)) { (seq, _) =>
+              (seq.head + RAND.nextInt(params.sessionTimeoutSecs*1000-1))::seq
+            }
+            .reverse
+        val views = tstamps.map { randPageView(u, w, _, params.numPages) 
}.toList
+        val session = Session(u, views.head.tstamp, views.toList)
+
+        if (RAND.nextDouble() < 0.7)
+          List(session)
+        else
+          splitSessions(session, params.sessionTimeoutSecs, start, end)
+      }
+  }
+
+  /**
+    * Prunes sessions:
+    * - Removes view for namespaces that are not 0
+    * - Removes empty sessions
+    * - Removes sessions that have start times out of the requested range
+    */
+  def pruneInvalidSessions(sessions: Seq[Session], begin: DateTime, end: 
DateTime) : Seq[Session] = {
+    sessions
+      .map { s => Session(s.user, s.tstamp, s.views.filter(_.pageNamespace == 
0).toList) }
+      .filter { s =>
+        s.views.nonEmpty && s.tstamp >= begin.getMillis && s.tstamp <= 
end.getMillis
+      }
+  }
+
+  def writeSessions(path: String, sessions: Seq[Session]) : Unit = {
+    val out = new PrintWriter(new FileWriter(path))
+    out.write("wiki_db\tdate\ttstamp\tpages\n")
+    sessions.foreach {
+      s =>
+        val ts = new Timestamp(s.tstamp).toString
+        val date = ts.substring(0, 10)
+        val wiki = s.views.head.wikiDb
+        val pageIds = s.views.map { _.pageId.toString }.mkString(" ")
+        out.write(wiki + "\t" + date + "\t" + ts + "\t" + pageIds + "\n")
+    }
+    out.close()
+  }
+
+  import scala.collection.JavaConverters._
+
+
+  def writeWikis(path: String, wikis: Seq[String], snapshot: String): Unit = {
+    val out = new PrintWriter(new FileWriter(path))
+    wikis.distinct.foreach {
+      w =>
+        val js = Map(
+          "hostname" -> (w + ".org"),
+          "snapshot" -> snapshot,
+          "dbname" -> w
+        )
+        JSONObject.writeJSONString(js.asJava, out)
+        out.write("\n")
+    }
+    out.close()
+  }
+
+  def writeRequests(path: String, sessions: Seq[Session]): Unit = {
+    val allViews = sessions.flatMap { s =>  s.views.map((s.user, _)) }
+
+    val out = new PrintWriter(new FileWriter(path))
+    RAND.shuffle(allViews).foreach { case (user, view) =>
+      val dt = new DateTime(view.tstamp)
+
+      val js = Map(
+        "pageview_info" -> Map( "project" -> view.wikiDb ).asJava,
+        "ts" -> new Timestamp(dt.getMillis).toString,
+        "user_agent" -> user.x,
+        "namespace_id" -> view.pageNamespace,
+        "page_id" -> view.pageId,
+        "x_forwarded_for" -> user.y,
+        "client_ip" -> user.z,
+        "is_pageview" -> true,
+        "agent_type" -> "user",
+        "webrequest_source" -> "text",
+        "access_method" -> "desktop",
+        "normalized_host" -> Map("project_class" -> "wikimedia").asJava,
+        "year" -> dt.getYear,
+        "month" -> dt.getMonthOfYear,
+        "day" -> dt.getDayOfMonth,
+        "hour" -> dt.getHourOfDay
+      )
+      JSONObject.writeJSONString(js.asJava, out)
+      out.write("\n")
+    }
+    out.close()
+  }
+
+  def writeFreq(path: String, sessions: Seq[TestDataCreator.Session]) : 
Seq[(String, Long, Int)] = {
+
+    val freqs =
+      sessions
+      .flatMap(_.views)
+      .map { v => (v.wikiDb, v.pageId) }
+      .groupBy(identity)
+      .mapValues(_.length)
+      .map { x => (x._1._1, x._1._2, x._2)}
+
+    TestUtils.deleteRecursively(new File(path))
+
+    val out = new PrintWriter(new FileWriter(path))
+
+    out.write("wiki_db\tpage\tfrequency\n")
+    freqs.foreach { case (wiki, word, freq) =>
+      out.write(wiki + "\t" + word + "\t" + freq + "\n")
+    }
+
+    out.close()
+
+    freqs.toList
+  }
+
+  def writePruned(path: String, valid: Seq[Session], freq: Seq[(String, Long, 
Int)], minFreq: Int): Unit = {
+    val keepers = freq
+      .filter(_._3 >= minFreq)
+      .map { case (wiki, page, n) => (wiki, page) }
+      .toSet
+
+    val out = new PrintWriter(new FileWriter(path))
+    out.write("wiki_db\tdate\ttstamp\tpages\n")
+    valid.foreach { s =>
+      val ts = new Timestamp(s.tstamp).toString
+      val date = ts.substring(0, 10)
+      val wiki = s.views.head.wikiDb
+
+      val pageIds = s
+        .views
+        .filter { v => keepers.contains((v.wikiDb, v.pageId)) }
+        .map { _.pageId.toString }
+        .mkString(" ")
+
+      if (pageIds.nonEmpty) {
+        out.write(wiki + "\t" + date + "\t" + ts + "\t" + pageIds + "\n")
+      }
+    }
+
+    out.close()
+  }
+
+  def generate(params: Params) : Unit = {
+    // Allow some sessions to start earlier than the requested range.
+    val minStart = params.startTime.getMillis - (params.endTime.getMillis - 
params.startTime.getMillis) / 4
+    val sessions = generateSessions(params)
+    val valid = pruneInvalidSessions(sessions, params.startTime, 
params.endTime)
+
+    val path = params.outputDir.getAbsolutePath
+    Path(path).createDirectory(force = true, failIfExists = false)
+
+    writeWikis(path + "/wikis.json", params.wikis, params.snapshot)
+    writeRequests(path + "/requests.json", sessions)
+    writeSessions(path + "/sessions.txt",  valid)
+    val freq = writeFreq(path + "/frequency.txt", valid)
+    writePruned(path + "/sessions_pruned.txt", valid, freq, params.minFreq)
+  }
+
+  def main(args: Array[String]): Unit = {
+    RAND.setSeed(1L)
+    generate(Params())
+  }
+}
diff --git 
a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestPartitionQueryBuilder.scala
 
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestPartitionQueryBuilder.scala
new file mode 100644
index 0000000..84c8bea
--- /dev/null
+++ 
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestPartitionQueryBuilder.scala
@@ -0,0 +1,77 @@
+package org.wikimedia.analytics.refinery.job.vectors
+
+import java.sql.Timestamp
+
+import org.scalatest.{FlatSpec, Matchers}
+import PartitionQueryBuilder.formSql
+
+class TestPartitionQueryBuilder extends FlatSpec with Matchers {
+
+  def sqlOr(conds : Seq[String]): String =  { conds.map("(" + _ + 
")").mkString(" or ")}
+  def normalize(s : String) : String = { s.toLowerCase.replaceAllLiterally(" 
", "")}
+
+  "A PartitionQueryBuilder " should " build partition queries " in {
+
+    val tests: Seq[(String, String, String)] = Seq(
+      ( // Simple example
+        "2017-01-04 03:10:00",
+        "2017-01-05 04:00:00",
+        sqlOr(List("year = 2017 and month = 1 and day = 4 and hour >= 3",
+                   "year = 2017 and month = 1 and day = 5 and hour <= 4"))
+      ),
+      ( // Edge case with empty interval
+        "2017-01-04 03:00:00",
+        "2017-01-04 03:00:00",
+        sqlOr(List("year = 2017 and month = 1 and day = 4 and hour = 3"))
+      ),
+      ( // Start and end in same day
+        "2017-01-04 03:10:00",
+        "2017-01-04 07:00:00",
+        sqlOr(List(
+          "year = 2017 and month = 1 and day = 4 and hour >= 3 and hour <= 7"))
+      ),
+      ( // Have a full day in the middle
+        "2017-01-04 03:10:00",
+        "2017-01-06 04:00:00",
+        sqlOr(List(
+          "year = 2017 and month = 1 and day = 4 and hour >= 3",
+          "year = 2017 and month = 1 and day = 5",
+          "year = 2017 and month = 1 and day = 6 and hour <= 4"))
+      ),
+      ( // Have a full month in the middle
+        "2017-01-30 03:10:00",
+        "2017-03-02 04:00:00",
+        sqlOr(List(
+          "year = 2017 and month = 1 and day = 30 and hour >= 3",
+          "year = 2017 and month = 1 and day = 31",
+          "year = 2017 and month = 2",
+          "year = 2017 and month = 3 and day = 1",
+          "year = 2017 and month = 3 and day = 2 and hour <= 4"))
+      ),
+      ( // Have a full year in the middle
+        "2014-11-29 03:10:00",
+        "2017-02-02 04:00:00",
+        sqlOr(List(
+          "year = 2014 and month = 11 and day = 29 and hour >= 3",
+          "year = 2014 and month = 11 and day = 30",
+          "year = 2014 and month = 12",
+          "year = 2015",
+          "year = 2016",
+          "year = 2017 and month = 1",
+          "year = 2017 and month = 2 and day = 1",
+          "year = 2017 and month = 2 and day = 2 and hour <= 4"))
+      )
+    )
+
+    tests.foreach( t => {
+      val beg = Timestamp.valueOf(t._1)
+      val end = Timestamp.valueOf(t._2)
+      val sql = t._3
+      val res = formSql(beg, end)
+      System.out.println(res)
+
+      normalize(sql) shouldEqual normalize(res)
+    })
+  }
+
+}
diff --git 
a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPagesBuilder.scala
 
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPagesBuilder.scala
new file mode 100644
index 0000000..9d33c9e
--- /dev/null
+++ 
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPagesBuilder.scala
@@ -0,0 +1,72 @@
+package org.wikimedia.analytics.refinery.job.vectors
+
+import java.io.File
+import java.sql.Timestamp
+
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.{SparkConf, SparkContext}
+import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
+
+case class Session(wiki_db: String, date: String, tstamp: Timestamp, pages: 
String)
+
+/**
+  * @author Shilad Sen
+  *
+  */
+class TestSessionPagesBuilder extends FlatSpec with Matchers with 
BeforeAndAfter {
+
+  var DEBUG_MODE = true
+  var PARAMS : TestDataCreator.Params = null
+
+  before {
+
+    PARAMS = TestDataCreator.Params(
+      outputDir =
+        if (DEBUG_MODE) new File("testdata")
+        else File.createTempFile("test", "")
+    )
+
+    TestUtils.deleteRecursively(PARAMS.outputDir)
+    PARAMS.outputDir.mkdirs()
+
+    TestDataCreator.generate(PARAMS)
+  }
+
+  after {
+    if (!DEBUG_MODE) TestUtils.deleteRecursively(PARAMS.outputDir)
+  }
+
+  "SessionPagesBuilder" should "extract sessions" in {
+
+    val params = SessionPagesBuilder.Params(
+      outputTable = "sessions",
+      projectNamespaceTable = "wikis",
+      wikiList = PARAMS.wikis,
+      webrequestTable = "requests",
+      snapshot = PARAMS.snapshot,
+      begin = new Timestamp(PARAMS.startTime.getMillis),
+      end = new Timestamp(PARAMS.endTime.getMillis)
+    )
+
+    val conf = new SparkConf()
+      .setMaster("local[*]")
+      .setAppName("test-session-pages-builder")
+    val sc = new SparkContext(conf)
+
+    val path = PARAMS.outputDir.getAbsoluteFile
+    val sql = new SQLContext(sc)
+
+    sql.read.json(path + "/requests.json")
+      .registerTempTable(params.webrequestTable)
+    sql.read.json(path + "/wikis.json")
+      .registerTempTable(params.projectNamespaceTable)
+
+    SessionPagesBuilder(sql, params)
+
+    TestUtils.tableToCSV(sql, "sessions", path + "/spark_sessions.txt")
+
+    assert(TestUtils.filesAreSame(
+      path + "/spark_sessions.txt",
+      path + "/sessions.txt"))
+  }
+}
\ No newline at end of file
diff --git 
a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPruner.scala
 
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPruner.scala
new file mode 100644
index 0000000..ce406da
--- /dev/null
+++ 
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPruner.scala
@@ -0,0 +1,88 @@
+package org.wikimedia.analytics.refinery.job.vectors
+
+import java.io.File
+import java.sql.Timestamp
+
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.{SparkConf, SparkContext}
+import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
+
+import scala.io.Source
+
+/**
+  */
+class TestSessionPruner extends FlatSpec with Matchers with BeforeAndAfter {
+
+  var DEBUG_MODE = true
+  var PARAMS : TestDataCreator.Params = null
+
+  before {
+
+    PARAMS = TestDataCreator.Params(
+      outputDir =
+        if (DEBUG_MODE) new File("testdata")
+        else File.createTempFile("test", "")
+    )
+
+    TestUtils.deleteRecursively(PARAMS.outputDir)
+    PARAMS.outputDir.mkdirs()
+
+    TestDataCreator.generate(PARAMS)
+  }
+
+  after {
+    if (!DEBUG_MODE) TestUtils.deleteRecursively(PARAMS.outputDir)
+  }
+
+
+  "SessionPruner" should "count and prune by frequency" in {
+    val path = PARAMS.outputDir.getAbsolutePath
+
+    val params = SessionPruner.Params(
+      inputTable = "sessions",
+      outputTable = "pruned_sessions",
+      freqTable = Some("frequency"),
+      wikiList = PARAMS.wikis,
+      begin = new Timestamp(PARAMS.startTime.getMillis),
+      end = new Timestamp(PARAMS.endTime.getMillis),
+      minFreq = 5
+    )
+
+    val conf = new SparkConf()
+      .setMaster("local[*]")
+      .setAppName("test-session-pages-builder")
+    val sc = new SparkContext(conf)
+
+    val sql = new SQLContext(sc)
+
+    sql
+      .read
+      .format("com.databricks.spark.csv")
+      .option("delimiter", "\t")
+      .option("header", "true")
+      .load(path + "/sessions.txt")
+      .registerTempTable("sessions")
+
+    SessionPruner(sql, params)
+
+    PARAMS.wikis.foreach { wiki =>
+    TestUtils.tableToCSV(sql, "frequency", path + "/spark_frequency.txt")
+
+      assert(
+        TestUtils.filesAreSame(
+          path + "/frequency.txt",
+          path + "/spark_frequency.txt"
+        )
+      )
+    }
+
+    TestUtils.tableToCSV(sql, "pruned_sessions", path + 
"/spark_pruned_sessions.txt")
+
+    assert(
+      TestUtils.filesAreSame(
+        path + "/sessions_pruned.txt",
+        path + "/spark_pruned_sessions.txt"
+      )
+    )
+  }
+}
\ No newline at end of file
diff --git 
a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestUtils.scala
 
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestUtils.scala
new file mode 100644
index 0000000..afb2479
--- /dev/null
+++ 
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestUtils.scala
@@ -0,0 +1,64 @@
+package org.wikimedia.analytics.refinery.job.vectors
+
+import java.io.{BufferedWriter, File, FileWriter}
+import java.sql.Timestamp
+
+import org.apache.spark.sql.SQLContext
+
+import scala.io.Source
+
+/**
+  * @author Shilad Sen
+  */
+object TestUtils {
+
+  /**
+    * Dumps a sql table as a CSV file
+    */
+  def tableToCSV(sql: SQLContext, table: String, outputPath: String) : Unit = {
+    val tb = sql.table(table)
+
+    val out = new BufferedWriter(new FileWriter(outputPath))
+    out.write(tb.columns.mkString("\t") + "\n")
+
+    tb.collect().foreach { row =>
+      out.write(row.mkString("\t") + "\n")
+    }
+
+    out.close()
+  }
+
+  /**
+    * Checks whether the files are identical on a line-by-line basis.
+    * Lines are allowed to be reordered.
+    */
+  def filesAreSame(computedPath : String, correctPath: String) : Boolean = {
+    val computed = Source.fromFile(computedPath).getLines.toSet
+    val correct = Source.fromFile(correctPath).getLines.toSet
+
+    val nMissing = correct
+      .filter { !computed.contains(_) }
+      .toList
+      .sorted
+      .map { line => System.out.println("Missing correct line: " + line); line 
}
+      .length
+
+    val nAdded = computed
+      .filter { !correct.contains(_) }
+      .toList
+      .sorted
+      .map { line => System.out.println("Additional incorrect line: " + line); 
line }
+      .length
+
+    nMissing == 0 && nAdded == 0
+  }
+
+
+  // From https://stackoverflow.com/a/26769030/141245
+  def deleteRecursively(file: File): Unit = {
+    if (file.isDirectory)
+      file.listFiles.foreach(deleteRecursively)
+    if (file.exists && !file.delete)
+      throw new Exception(s"Unable to delete ${file.getAbsolutePath}")
+  }
+}

-- 
To view, visit https://gerrit.wikimedia.org/r/383761
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I19160e16d8140d03d81a4226e3974f42ec1e3602
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery/source
Gerrit-Branch: nav-vectors
Gerrit-Owner: Shilad Sen <s...@macalester.edu>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to