Shilad Sen has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/381517 )

Change subject: Simplified job to create session ids and finished debugging it.
......................................................................

Simplified job to create session ids and finished debugging it.

Changes:

* Instead of calculating page ids from titles, took page ids and NS ids
directly from webrequest table. I think that this works okay in the
scenarios I care about (i.e. desktop page views)

* Builder now passes unit tests.

TODO:

* Added sessions separated by timeout to testing data

* Benchmark new version on production cluster.

* Turn test into proper unit test.

Bug: T174796
Change-Id: Iaec6ee811dd3da30152c3dbb206bf3a505370f90
---
M 
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
M 
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestDataCreator.scala
M 
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestParams.scala
M 
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPagesBuilder.scala
4 files changed, 146 insertions(+), 317 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source 
refs/changes/17/381517/1

diff --git 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
index 1d03705..6e593b4 100644
--- 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
+++ 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
@@ -14,8 +14,7 @@
   * This create output files with one line per session.
   * The first two tokens of each line are the session start timestamp and 
language edition.
   * The following tokens are the page ids viewed in the session, in order.
-  **
- *
+  *
   * @author Shilad Sen
   */
 object SessionPagesBuilder {
@@ -51,6 +50,11 @@
   val LOOK_PAST_SECS: Int = 60 * 59
 
 
+  /**
+    * A user session is defined by: A wiki, a hash of unique user information,
+    * and the timestamp for a view. This is used to stream through pageviews
+    * that may have been part of the same user session.
+    */
   case class SessionKey (wikiDb: String, userHash: Long, tstamp: Long)
     extends Ordered[SessionKey] {
     override def compare(that: SessionKey): Int = {
@@ -63,7 +67,9 @@
     }
   }
 
-
+  /**
+    * Partitions all views for the same wiki from the same user onto a single 
partition.
+    */
   class SessionPartitioner(partitions: Int) extends Partitioner {
     require(partitions >= 0, s"Number of partitions ($partitions) cannot be 
negative.")
 
@@ -81,6 +87,11 @@
     }
   }
 
+  /**
+    * Wrapper that allows us to handle production hive timestamps (which
+    * are already timestmaps), and testing data from JSON files (which are
+    * strings) uniformly.
+    */
   def getTimestamp(ts: Any) : Timestamp = {
     ts match {
       case _: Timestamp =>
@@ -90,19 +101,6 @@
       case _ => throw new IllegalArgumentException(ts.toString)
     }
   }
-
-  /**
-    * A single page.
-    * Effective page id is the page id after resolving redirects.
-    */
-  case class PageInfo(
-                       wikiDb: String,
-                       pageId: Long,
-                       pageTitle: String,
-                       pageNamespace: Long,
-                       pageIsRedirect: Boolean,
-                       effectivePageId : Long
-                     )
 
   /**
     * A view of a page by a user.
@@ -117,21 +115,14 @@
                        pageId: Long
                      )
 
-  case class Redirect(
-                       wikiDb: String,
-                       fromPageId: Long,
-                       toPageId: Long
-                     )
-
-
   /**
     * Returns true iff the two view are part of the same session.
     * User hashes must match and the pageviews must fall within a specified 
window.
     */
-  def sameSession(v1: PageView, v2: PageView, timeoutMillis: Long) : Boolean = 
{
+  def sameSession(v1: PageView, v2: PageView, timeoutSecs: Long) : Boolean = {
     (v1.wikiDb == v2.wikiDb) &&
       (v1.userHash == v2.userHash) &&
-      (Math.abs(v1.tstamp - v2.tstamp) <= timeoutMillis)
+      (Math.abs(v1.tstamp - v2.tstamp) <= timeoutSecs * 1000)
   }
 
 
@@ -179,131 +170,6 @@
       map(r => r.getString(0) -> r.getString(1)).toMap
   }
 
-  def hoursInRange(begin: org.joda.time.DateTime, end: org.joda.time.DateTime) 
: Seq[(Int, Int, Int, Int)]= {
-    val beginHour = begin.hourOfDay().roundFloorCopy()
-    val endHour = end.hourOfDay().roundCeilingCopy()
-    val numHours = Hours.hoursBetween(beginHour, endHour).getHours
-    val dates = (0 to numHours).map(beginHour.plusHours)
-    dates.map(d => (d.getYear, d.getMonthOfYear, d.getDayOfMonth, 
d.getHourOfDay))
-  }
-
-  /**
-    * Prepare the pages dataset to be reused
-    *
-    * @return A RDD of PageInfo data augmented with fake pages for each wiki
-    */
-  def preparePages(
-                    sqlContext: SQLContext,
-                    pageTable: String,
-                    snapshot: String,
-                    wikiList: Seq[String]
-                  ): RDD[PageInfo] = {
-    sqlContext.sql(
-      s"""
-                      |SELECT
-                      |  wiki_db,
-                      |  page_id,
-                      |  page_title,
-                      |  page_namespace,
-                      |  page_is_redirect
-                      |FROM $pageTable
-                      |WHERE snapshot = '$snapshot'
-                      |  AND wiki_db IN (${listToSQLInCondition(wikiList)})
-                      |  AND page_id IS NOT NULL
-                      |  AND page_namespace = 0
-                      |  AND page_id > 0
-                      |GROUP BY
-                      |  wiki_db,
-                      |  page_id,
-                      |  page_title,
-                      |  page_namespace,
-                      |  page_is_redirect
-        """.stripMargin).rdd.
-      map(r => {
-        PageInfo(r.getString(0), r.getLong(1), r.getString(2),
-                 r.getLong(3), r.getBoolean(4), r.getLong(1))
-      })
-  }
-
-  /**
-    * Prepare the redirects dataset to be reused
-    *
-    * @return A RDD of redirect data
-    */
-  def prepareRedirects(
-                        sqlContext: SQLContext,
-                        redirectTable: String,
-                        snapshot: String,
-                        wikiList: Seq[String],
-                        pages: RDD[PageInfo]
-                      ): RDD[Redirect] = {
-
-    val pagesPerPageId = pages.map(p => ((p.wikiDb, p.pageId), 
p.pageTitle)).cache()
-    val pagesPerTitleAndNamespace = pages.map(p => ((p.wikiDb, p.pageTitle, 
p.pageNamespace), p.pageId)).cache()
-
-    sqlContext.sql(
-      s"""
-         |SELECT
-         |  wiki_db,
-         |  rd_from,
-         |  rd_title,
-         |  rd_namespace
-         |FROM $redirectTable
-         |WHERE snapshot = '$snapshot'
-         |  AND wiki_db IN (${listToSQLInCondition(wikiList)})
-         |  AND rd_from IS NOT NULL
-         |  AND rd_from > 0
-         |GROUP BY
-         |  wiki_db,
-         |  rd_from,
-         |  rd_title,
-         |  rd_namespace
-          """.stripMargin)
-        .rdd
-        .map(r => {
-          ((r.getString(0), r.getLong(1)), (r.getString(2), r.getLong(3)))
-        })
-        .filter(t => Option(t._2._1).isDefined)   // Remove null toPageTitle
-        .join(pagesPerPageId)                     // Ensure fromPageId exists 
in page table
-        .map { case ((wiki, fromPageId), ((toPageTitle, toPageNamespace), _)) 
=>
-                  ((wiki, toPageTitle, toPageNamespace), fromPageId) }
-        .join(pagesPerTitleAndNamespace)          // Get toPageId from page 
table
-        .map { case ((wiki, _ , _), (fromPageId, toPageId)) =>
-                  Redirect(wiki, fromPageId, toPageId) }
-        .distinct // prevent any duplicate (data corruption)
-  }
-
-  /**
-    * Replace page ids for pages that are redirected.
-    *
-    * @return PageInfo with redirects applied to relevant page ids
-    */
-  def resolveRedirects(
-                      sqlContext: SQLContext,
-                      pages: RDD[PageInfo],
-                      redirects: RDD[Redirect]
-                    ) : RDD[PageInfo] = {
-
-    var redirectsBySrc = redirects.map(r => ((r.wikiDb, r.fromPageId), 
r.toPageId)).cache()
-
-    // This function maps redirects once
-    val processRedirects = (pages: RDD[PageInfo]) =>  {
-                              pages.map( p => ((p.wikiDb, p.effectivePageId), 
p) )
-                                .leftOuterJoin(redirectsBySrc)
-                                .map { case (_, (p, redirectId)) =>
-                                  PageInfo(p.wikiDb,
-                                            p.pageId,
-                                            p.pageTitle, p.pageNamespace,
-                                            p.pageIsRedirect,
-                                            
redirectId.getOrElse(p.effectivePageId))
-                                }
-                            } : RDD[PageInfo]
-
-    // We only process two-levels of redirects. Is this okay?
-    processRedirects(
-      processRedirects(pages))
-  }
-
 
   /**
     * Prepare the view data
@@ -315,24 +181,23 @@
                           begin: Timestamp,
                           end: Timestamp,
                           projectList: Seq[String],
-                          projectToWikiMap: Map[String, String],
-                          pages: RDD[PageInfo]
+                          projectToWikiMap: Map[String, String]
                         ): RDD[PageView] = {
-    val pagesPerTitles = pages
-      .map(p => ((p.wikiDb, p.pageTitle), (p.pageNamespace, p.pageId))) // 
((wiki, pageTitle), (pageNamespace, pageId))
-      .cache()
 
     sqlContext.sql(
       s"""
          |SELECT
          |  CONCAT(pageview_info['project'], '.org') AS project,
-         |  pageview_info['page_title'] as title,
+         |  namespace_id,
+         |  page_id,
          |  ts,
          |  client_ip,
          |  user_agent,
          |  x_forwarded_for
          |FROM $webrequestTable
          |WHERE webrequest_source = 'text'
+         |  AND access_method = 'desktop'
+         |  AND normalized_host.project_class IN ('wikipedia', 'wikimedia')
          |  AND(${PartitionQueryBuilder.formSql(begin, end)})
          |  AND pageview_info['project'] IN 
(${listToSQLInCondition(projectList)})
          |  AND is_pageview
@@ -340,19 +205,15 @@
           """.stripMargin)
       .rdd
       .map(r => {
-                  val wiki = projectToWikiMap(r.getString(0))
-                  val uh = userHash(r.getString(3), r.getString(4), 
r.getString(5))
-                  ((wiki, r.getString(1)), (getTimestamp(r.get(2)), uh))
-                })
-      .join(pagesPerTitles)
-      .map { case ((wiki, title), ((ts, userHash), (pageNs, pageId))) =>
-             PageView(wiki, userHash, ts.getTime, pageNs, pageId)
-           }
+        val wiki = projectToWikiMap(r.getString(0))
+        val uh = userHash(r.getString(4), r.getString(5), r.getString(6))
+        PageView(wiki, uh, getTimestamp(r.get(3)).getTime, r.getLong(1), 
r.getLong(2))
+      })
   }
 
   /**
     * Aggregates page views by user / wiki and groups them into sessions.
-    *
+    *G
     * 1. Create session key composed of wiki, userhash, tstamp that delineates 
sessions.
     * 2. Partition by wiki, userhash with secondary sort on tstamp.
     * 3. Reduce page views into sessions on each partitition. The iterator
@@ -361,7 +222,7 @@
     *
     * @return
     */
-  def createSessions(views: RDD[PageView], timeoutInMillis: Int): 
RDD[Seq[PageView]] = {
+  def createSessions(views: RDD[PageView], timeoutInSecs: Int): 
RDD[Seq[PageView]] = {
     views
       .map { v => (SessionKey(v.wikiDb, v.userHash, v.tstamp), v) }
       .repartitionAndSortWithinPartitions(new 
SessionPartitioner(views.getNumPartitions))
@@ -399,7 +260,7 @@
             def tryToFillSession(): Unit = {
               if (!eos && nextSessionHead.isEmpty && iter.hasNext) {
                 var p = iter.next._2
-                if (session.isEmpty || sameSession(p, session.last, 
timeoutInMillis)) {
+                if (session.isEmpty || sameSession(p, session.last, 
timeoutInSecs)) {
                   session += p
                   tryToFillSession()
                 } else {
@@ -420,10 +281,18 @@
     import sqlContext.implicits._
 
     sessions
-      .filter(views => views.nonEmpty)
-      .map(views => (views.head.wikiDb,
-                     new Timestamp(views.head.tstamp),
-                     views.map(v => v.pageId.toString).mkString(" ")))
+      .map { views =>
+        // Make concatenated ids with namespace 0
+        val ids = views
+            .filter(_.pageNamespace == 0)
+            .map(_.pageId.toString)
+            .mkString(" ")
+
+        // Timestamp for session is timestamp for first view, regardless of
+        // whether it is ultimately filtered out
+        (views.head.wikiDb, new Timestamp(views.head.tstamp).toString, ids)
+      }
+      .filter(_._3.nonEmpty)  // Remove sessions with zero pageviews
       .repartition(outputFilesParts)
       .toDF
       .withColumnRenamed("_1", "wiki_db")
@@ -444,15 +313,13 @@
   case class Params(
                      outputBasePath: String = "/user/shiladsen/sessions",
                      projectNamespaceTable: String = 
"wmf_raw.mediawiki_project_namespace_map",
-                     pageTable: String = "wmf_raw.mediawiki_page",
-                     redirectTable: String = "wmf_raw.mediawiki_redirect",
                      webrequestTable: String = "wmf.webrequest",
                      wikiList: Seq[String] = Seq("simplewiki"),
                      outputFilesParts: Int = 1,
                      snapshot: String = "", // Parameter required, never used 
as is
                      begin: Timestamp = new Timestamp(0), // Parameter 
required, never used as is
                      end: Timestamp = new Timestamp(0), // Parameter required, 
never used as is
-                     sessionTimeoutMillis: Int = 15*60*1000
+                     sessionTimeoutSecs: Int = 15*60*1000
                    )
 
   val YMD: DateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd")
@@ -498,9 +365,9 @@
     } text ("The ending time for webrequest data gathering.")
 
     opt[Int]('t', "timeout") optional() valueName ("<timeout>") action { (x, 
p) =>
-      p.copy(sessionTimeoutMillis = x)
+      p.copy(sessionTimeoutSecs = x)
     } validate { x => if (x >= 0) success else failure("Invalid timeout")
-    } text ("The timeout in milliseconds that distinguishes user sessions.")
+    } text ("The timeout in seconds that delineates user sessions.")
 
 
     opt[String]('o', "output-base-path") optional() valueName ("<path>") 
action { (x, p) =>
@@ -524,14 +391,6 @@
     opt[String]("project-namespace-table") optional() valueName ("<table>") 
action { (x, p) =>
       p.copy(projectNamespaceTable = x)
     } text ("Fully qualified name of the project-namespace table on Hive. 
Default to wmf_raw.mediawiki_project_namespace_map")
-
-    opt[String]("page-table") optional() valueName ("<table>") action { (x, p) 
=>
-      p.copy(pageTable = x)
-    } text ("Fully qualified name of the page table on Hive. Default to 
wmf_raw.mediawiki_page")
-
-    opt[String]("redirect-table") optional() valueName ("<table>") action { 
(x, p) =>
-      p.copy(redirectTable = x)
-    } text ("Fully qualified name of the redirect table on Hive. Default to 
wmf_raw.mediawiki_redirect")
 
     opt[String]("webrequest-table") optional() valueName ("<table>") action { 
(x, p) =>
       p.copy(webrequestTable = x)
@@ -571,35 +430,35 @@
     sql.sparkContext
       .getConf
       .registerKryoClasses(Array(
-        classOf[PageInfo],
-        classOf[Redirect],
         classOf[PageView]))
 
     import sql.implicits._
 
-    val projectToWikiMap = prepareProjectToWikiMap(sql, 
params.projectNamespaceTable, params.snapshot, params.wikiList)
+    val projectToWikiMap = prepareProjectToWikiMap(
+      sql,
+      params.projectNamespaceTable,
+      params.snapshot,
+      params.wikiList)
     val domainList = projectToWikiMap.keys.toList
     val projectList = domainList.map(_.stripSuffix(".org"))
 
-    val pages = preparePages(sql, params.pageTable, params.snapshot, 
params.wikiList).cache()
-    val redirects = prepareRedirects(sql, params.redirectTable, 
params.snapshot, params.wikiList, pages)
-    val pages2 = resolveRedirects(sql, pages, redirects)
-
     val viewBeg = new Timestamp(new DateTime(params.begin.getTime)
-                                      .minusSeconds(LOOK_PAST_SECS)
+                                      .minusSeconds(2 * 
params.sessionTimeoutSecs)
                                       .getMillis)
     val viewEnd = new Timestamp(new DateTime(params.end.getTime)
-                                      .minusSeconds(LOOK_FUTURE_SECS)
+                                      .plusSeconds(LOOK_FUTURE_SECS)
                                       .getMillis)
+
     val views = prepareViews( sql, params.webrequestTable,
                               viewBeg, viewEnd,
-                              projectList,
-                              projectToWikiMap,
-                              pages)
+                              projectList, projectToWikiMap)
 
-    val beginMillis = params.begin.getTime
-    val sessions = createSessions(views, params.sessionTimeoutMillis)
-                        .filter(_.head.tstamp >= beginMillis)
+    val sessions = createSessions(views, params.sessionTimeoutSecs)
+      .filter { ts =>
+        ts.head.tstamp >= params.begin.getTime &&
+        ts.head.tstamp <= params.end.getTime
+      }
+
     writeSessions(sql, sessions, params.outputBasePath, 
params.outputFilesParts)
   }
 }
diff --git 
a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestDataCreator.scala
 
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestDataCreator.scala
index c1641b0..3b2bcb8 100644
--- 
a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestDataCreator.scala
+++ 
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestDataCreator.scala
@@ -5,7 +5,7 @@
 
 import org.joda.time.DateTime
 import org.json.simple.JSONObject
-import 
org.wikimedia.analytics.refinery.job.vectors.SessionPagesBuilder.{PageInfo, 
PageView, Redirect, userHash}
+import 
org.wikimedia.analytics.refinery.job.vectors.SessionPagesBuilder.{PageView, 
userHash}
 
 import scala.collection.{immutable, mutable}
 import scala.reflect.io.Path
@@ -21,6 +21,7 @@
 object TestDataCreator {
 
   case class UserInfo(x: String, y: String, z: String, hash: Long)
+  case class Session(user: UserInfo, tstamp: Long, views: Seq[PageView])
 
   import TestParams._
 
@@ -46,61 +47,53 @@
     }
   }
 
-  def generatePages(wikis: Seq[String], numPagesPerWiki: Int): Seq[PageInfo] = 
{
-    wikis
-      .flatMap(w => (1 to numPagesPerWiki).map((w, _))) // seq of (wikiId, 
pageId)
-      .map { case (w, p) =>
-        PageInfo(w, p, "Article " + p, randNamespace(), pageIsRedirect = 
false, -1)
-      }
+  def randPageView(user : UserInfo, wiki: String, tstamp: Long, numArticles : 
Int) : PageView = {
+    val pageId = Random.nextInt(numArticles)
+    val pageNs = (pageId.hashCode() % 10 - 8).max(0)
+    PageView(wiki, user.hash, tstamp, pageNs, pageId)
   }
 
-  def usedRedirects: mutable.Set[(String, String)] = 
scala.collection.mutable.Set[(String, String)]()
-
-  def generateRedirects(pages: Seq[PageInfo], redirectsPerWiki: Int) : 
Seq[Redirect]= {
-    val maxId = pages.map(_.pageId).max
-    val wikis = pages.map(_.wikiDb).distinct.toList
-    val byWiki = pages.groupBy(_.wikiDb)
-
-    byWiki.keys.flatMap { w =>
-      val shuffled = Random.shuffle(byWiki(w).toList).toArray
-      val srcs = shuffled.slice(0, redirectsPerWiki)
-      val dests = shuffled.slice(redirectsPerWiki, redirectsPerWiki * 2)
-      srcs.zip(dests).map { pair => Redirect(w, pair._1.pageId, 
pair._2.pageId) }
-    }.toSeq
-  }
-
-  def generateSessions(start: Long, end: Long, pages: Seq[PageInfo], 
numSessions: Int, meanLength: Int): Seq[(UserInfo, Seq[(Long, PageInfo)])] = {
-    val byWiki = pages.groupBy(_.wikiDb)
-    val wikis = byWiki.keys.toList
+  def generateSessions(wikis : Seq[String], articlesPerWiki: Int, start: Long, 
end: Long, numSessions: Int, meanLength: Int): Seq[Session] = {
     (1 to numSessions)
       .map { _ =>
         val n = 1 + Random.nextInt(meanLength * 2)
         val w = choice(wikis)
-        val tstamps =  (1 to n)
-                          .foldLeft(List(start + 
Random.nextInt((end-start).asInstanceOf[Int])))(
-                            (seq, _) => (seq.head + 
Random.nextInt(SESSION_TIMEOUT_MILLIS-1))::seq)
-                          .reverse
-        val session = tstamps.map { (_, choice(byWiki(w))) }
-        (randUser(), session.toList)
+        val u = randUser()
+        val startTstamp = start + Random.nextInt((end-start).asInstanceOf[Int])
+        val tstamps =
+          (1 to n)
+            .foldLeft(List(startTstamp)) { (seq, _) =>
+              (seq.head + Random.nextInt(SESSION_TIMEOUT_SECS*1000-1))::seq
+            }
+            .reverse
+        val views = tstamps.map { randPageView(u, w, _, articlesPerWiki) 
}.toList
+        Session(u, views.head.tstamp, views.toList)
       }
   }
 
-  def writeSessions(path: String,
-                    pages: Seq[PageInfo],
-                    redirects: Seq[Redirect],
-                    sessions: Seq[(UserInfo, Seq[(Long, PageInfo)])]) : Unit = 
{
-    val pageIdMap = pages
-                      .map(p => ((p.wikiDb, p.pageId), p.pageId.toString))
-                      .union(redirects.map(r => ((r.wikiDb, r.fromPageId), 
r.toPageId.toString)))
-                      .toMap
+  /**
+    * Prunes sessions:
+    * - Removes view for namespaces that are not 0
+    * - Removes empty sessions
+    * - Removes sessions that have start times out of the requested range
+    */
+  def pruneInvalidSessions(sessions: Seq[Session], begin: DateTime, end: 
DateTime) : Seq[Session] = {
+    sessions
+      .map { s => Session(s.user, s.tstamp, s.views.filter(_.pageNamespace == 
0).toList) }
+      .filter { s =>
+        s.views.nonEmpty && s.tstamp >= begin.getMillis && s.tstamp <= 
end.getMillis
+      }
+  }
 
+  def writeSessions(path: String, sessions: Seq[Session]) : Unit = {
     val out = new PrintWriter(new FileWriter(path))
+    out.write("wiki_db\ttstamp\tpages\n")
     sessions.foreach {
       s =>
-        val ts = s._2.head._1.toString
-        val wiki = s._2.head._2.wikiDb
-        val pageIds = s._2.map(tsAndPage => pageIdMap((tsAndPage._2.wikiDb, 
tsAndPage._2.pageId))).mkString(" ")
-        out.write(ts + "\t" + wiki + "\t" + pageIds + "\n")
+        val ts = new Timestamp(s.tstamp).toString
+        val wiki = s.views.head.wikiDb
+        val pageIds = s.views.map { _.pageId.toString }.mkString(" ")
+        out.write(wiki + "\t" + ts + "\t" + pageIds + "\n")
     }
     out.close()
   }
@@ -108,9 +101,9 @@
   import scala.collection.JavaConverters._
 
 
-  def writeWikis(path: String, pages: Seq[PageInfo]): Unit = {
+  def writeWikis(path: String, wikis: Seq[String]): Unit = {
     val out = new PrintWriter(new FileWriter(path))
-    pages.map(_.wikiDb).distinct.foreach {
+    wikis.distinct.foreach {
       w =>
         val js = Map(
           "hostname" -> (w + ".org"),
@@ -123,67 +116,26 @@
     out.close()
   }
 
-  def writePages(path: String, pages: Seq[PageInfo]): Unit = {
-    val out = new PrintWriter(new FileWriter(path))
-    pages.foreach { p =>
-      val js = Map(
-        "wiki_db" -> p.wikiDb,
-        "page_id" -> p.pageId,
-        "page_title" -> p.pageTitle,
-        "snapshot" -> SNAPSHOT,
-        "page_namespace" -> p.pageNamespace,
-        "page_is_redirect" -> p.pageIsRedirect
-      )
-      JSONObject.writeJSONString(js.asJava, out)
-      out.write("\n")
-    }
-    out.close()
-  }
-
-  def writeRedirects(path: String, pages: Seq[PageInfo], redirects: 
Seq[Redirect]): Unit = {
-    val titles = pages.map(p => ((p.wikiDb, p.pageId), p.pageTitle)).toMap
+  def writeRequests(path: String, sessions: Seq[Session]): Unit = {
+    val allViews = sessions.flatMap { s =>  s.views.map((s.user, _)) }
 
     val out = new PrintWriter(new FileWriter(path))
-    redirects.foreach { r =>
-      val js = Map(
-        "wiki_db" -> r.wikiDb,
-        "rd_from" -> titles((r.wikiDb, r.fromPageId)),
-        "snapshot" -> SNAPSHOT,
-        "rd_title" -> titles((r.wikiDb, r.toPageId)),
-        "rd_namespace" -> 0
-      )
-      JSONObject.writeJSONString(js.asJava, out)
-      out.write("\n")
-    }
-    out.close()
-  }
-
-
-  def writeRequests(path: String, sessions: Seq[(UserInfo, Seq[(Long, 
PageInfo)])]): Unit = {
-    val allViews = sessions.flatMap { s =>
-      val userInfo = s._1
-      s._2.map( tsAndPage => (userInfo, tsAndPage._1, tsAndPage._2))
-    }
-
-    val out = new PrintWriter(new FileWriter(path))
-    Random.shuffle(allViews).foreach { v =>
-      val user = v._1
-      val dt = new DateTime(v._2)
-      val p = v._3
-      val pvi = Map(
-        "project" -> p.wikiDb,
-        "page_title" -> p.pageTitle
-      ).asJava
+    Random.shuffle(allViews).foreach { case (user, view) =>
+      val dt = new DateTime(view.tstamp)
 
       val js = Map(
-        "pageview_info" -> pvi,
+        "pageview_info" -> Map( "project" -> view.wikiDb ).asJava,
         "ts" -> new Timestamp(dt.getMillis).toString,
         "user_agent" -> user.x,
+        "namespace_id" -> view.pageNamespace,
+        "page_id" -> view.pageId,
         "x_forwarded_for" -> user.y,
         "client_ip" -> user.z,
         "is_pageview" -> true,
         "agent_type" -> "user",
         "webrequest_source" -> "text",
+        "access_method" -> "desktop",
+        "normalized_host" -> Map("project_class" -> "wikimedia").asJava,
         "year" -> dt.getYear,
         "month" -> dt.getMonthOfYear,
         "day" -> dt.getDayOfMonth,
@@ -196,21 +148,19 @@
   }
 
   def main(args: Array[String]): Unit = {
-    val pages = generatePages(WIKIS, NUM_PAGES)
-    val redirects = generateRedirects(pages, NUM_REDIRECTS)
+    Random.setSeed(1L)
 
     // Allow some sessions to start earlier than the requested range.
     val minStart = START_TIME.getMillis - (END_TIME.getMillis - 
START_TIME.getMillis) / 4
-    val sessions = generateSessions(minStart, END_TIME.getMillis,
-                                    pages, NUM_SESSIONS, MEAN_SESSION_LENGTH)
+    val sessions = generateSessions(
+      WIKIS, NUM_PAGES,
+      minStart, END_TIME.getMillis,
+      NUM_SESSIONS, MEAN_SESSION_LENGTH)
 
     Path(PATH_DATA).createDirectory(force = true, failIfExists = false)
 
-    writeWikis(PATH_DATA + "/wikis.json", pages)
-    writePages(PATH_DATA + "/pages.json", pages)
-    writeRedirects(PATH_DATA + "/redirects.json", pages, redirects)
-    writeRequests(PATH_DATA + "/views.json", sessions)
-    writeSessions(PATH_DATA + "/sessions.txt", pages, redirects,
-                  sessions.filter(_._2.head._1 >= START_TIME.getMillis))
+    writeWikis(PATH_DATA + "/wikis.json", WIKIS)
+    writeRequests(PATH_DATA + "/requests.json", sessions)
+    writeSessions(PATH_DATA + "/sessions.txt",  pruneInvalidSessions(sessions, 
START_TIME, END_TIME))
   }
 }
diff --git 
a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestParams.scala
 
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestParams.scala
index 85e5222..6a27d72 100644
--- 
a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestParams.scala
+++ 
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestParams.scala
@@ -13,13 +13,15 @@
   val START_TIME = new DateTime(2017, 1, 1, 21, 0, 0)
   val END_TIME = new DateTime(2017, 1, 2, 2, 0, 0)
 
-  val WIKIS: Seq[String] = (1 to 3).map{ "wiki_" + _ }
-  val NUM_PAGES = 1000        // per wiki
-  val NUM_REDIRECTS = 300     // per wiki
-  val NUM_SESSIONS = 1000
 
-  val MEAN_SESSION_LENGTH = 20
-  val SESSION_TIMEOUT_MILLIS: Int = 60 * 15 * 1000 // 15 min
+  val WIKIS: Seq[String] = (1 to 3).map{ "wiki_" + _ }
+  val NUM_PAGES = 100        // per wiki
+  val NUM_REDIRECTS = 3     // per wiki
+  val NUM_SESSIONS = 1000
+  val MEAN_SESSION_LENGTH = 10
+
+  // TODO: Change millis to seconds
+  val SESSION_TIMEOUT_SECS: Int = 60 * 15 // 15 min
 
   val PATH_DATA = "./test-data/"
 }
diff --git 
a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPagesBuilder.scala
 
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPagesBuilder.scala
index 749aab8..b1453f2 100644
--- 
a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPagesBuilder.scala
+++ 
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPagesBuilder.scala
@@ -5,8 +5,12 @@
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.{SparkConf, SparkContext}
 
+import scala.io.Source
+
 /**
   * @author Shilad Sen
+  *
+  * TODO: Turn this into a regular unit test.
   */
 object TestSessionPagesBuilder {
   import TestParams._
@@ -15,8 +19,6 @@
     val params = SessionPagesBuilder.Params(
       outputBasePath = PATH_DATA + "/spark_sessions",
       projectNamespaceTable = "wikis",
-      pageTable = "pages",
-      redirectTable = "redirects",
       wikiList = WIKIS.toList,
       webrequestTable = "requests",
       snapshot = SNAPSHOT,
@@ -31,15 +33,31 @@
 
 
     val sql = new SQLContext(sc)
-    sql.read.json(PATH_DATA + "/views.json")
+    sql.read.json(PATH_DATA + "/requests.json")
       .registerTempTable(params.webrequestTable)
-    sql.read.json(PATH_DATA + "/redirects.json")
-      .registerTempTable(params.redirectTable)
-    sql.read.json(PATH_DATA + "/pages.json")
-      .registerTempTable(params.pageTable)
     sql.read.json(PATH_DATA + "/wikis.json")
       .registerTempTable(params.projectNamespaceTable)
 
     SessionPagesBuilder(sql, params)
+
+    val computed = Source.fromFile(PATH_DATA + 
"/spark_sessions/part-00000").getLines.toSet
+    val correct = Source.fromFile(PATH_DATA + "/sessions.txt").getLines.toSet
+
+    val nMissing = correct
+      .filter { !computed.contains(_) }
+      .toList
+      .sorted
+      .map { line => System.out.println("Missing correct session: " + line); 
line }
+      .length
+
+    val nAdded = computed
+      .filter { !correct.contains(_) }
+      .toList
+      .sorted
+      .map { line => System.out.println("Additional incorrect session: " + 
line); line }
+      .length
+
+    assert(nMissing == 0)
+    assert(nAdded == 0)
   }
 }
\ No newline at end of file

-- 
To view, visit https://gerrit.wikimedia.org/r/381517
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Iaec6ee811dd3da30152c3dbb206bf3a505370f90
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery/source
Gerrit-Branch: nav-vectors
Gerrit-Owner: Shilad Sen <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to