Change subject: Spark job to create session event log appears to be working.

Spark job to create session event log appears to be working.

Session event log is one session per line, with a list of page ids
viewed in that session ordered by tstamp.

simplewiki      2017-06-08 03:28:15.0   34242   9700
simplewiki      2017-06-08 03:14:49.0   31246
simplewiki      2017-06-08 03:14:22.0   34019   9084    81247   121

* Benchmark on larger datasets
* Optimize if necessary (try broadcasting datastructure to avoid join)
* Compare data cleaning with Elery's scripts

Bug: T174796
Change-Id: I52103578d50d492eaeff2eeffc8629331a1260da
1 file changed, 547 insertions(+), 0 deletions(-)

  git pull ssh:// 

diff --git 
new file mode 100644
index 0000000..d77a96a
--- /dev/null
@@ -0,0 +1,547 @@
+import org.apache.log4j.{Level, LogManager}
+import org.apache.spark.Partitioner
+import scala.collection.mutable.ArrayBuffer
+import scala.util.hashing.MurmurHash3
+  * Process to create a page view session log.
+  *
+  * This create output files with one line per session.
+  * The first two tokens of each line are the session start timestamp and 
language edition.
+  * The following tokens are the page ids viewed in the session, in order.
+  **
+  * @author Shilad Sen
+  */
+object SessionPagesBuilder {
+  import org.apache.spark.sql.hive.HiveContext
+  import org.apache.spark.SparkConf
+  import org.apache.spark.sql.SaveMode
+  import org.apache.spark.SparkContext
+  import scopt.OptionParser
+  import org.apache.spark.rdd.RDD
+  import org.apache.spark.sql.SQLContext
+  case class SessionKey (wikiDb: String, userHash: Long, tstamp: Long)
+    extends Ordered[SessionKey] {
+    override def compare(that: SessionKey): Int = {
+      import scala.math.Ordered.orderingToOrdered
+      Ordering.Tuple3(Ordering.String, Ordering.Long, Ordering.Long).compare(
+        (wikiDb, userHash, tstamp),
+        (that.wikiDb, that.userHash, that.tstamp)
+      )
+    }
+  }
+  class SessionPartitioner(partitions: Int) extends Partitioner {
+    require(partitions >= 0, s"Number of partitions ($partitions) cannot be 
+    override def numPartitions: Int = partitions
+    override def getPartition(key: Any): Int = {
+      val k = key.asInstanceOf[SessionKey]
+      Math.abs(k.wikiDb.hashCode() + k.userHash.asInstanceOf[Int]) % 
+    }
+  }
+  object SessionKey {
+    implicit def orderingByTimestamp[A <: SessionKey] : Ordering[A] = {
+ => (fk.wikiDb, fk.userHash, fk.tstamp))
+    }
+  }
+  case class PageInfo(
+                       wikiDb: String,
+                       pageId: Long,
+                       pageTitle: String,
+                       pageNamespace: Long,
+                       pageIsRedirect: Boolean,
+                       effectivePageId : Long    // page id or redirected 
destination page id
+                     )
+  case class Redirect(
+                       wikiDb: String,
+                       fromPageId: Long,
+                       toPageId: Long
+                     )
+  case class PageView(
+                       wikiDb: String,
+                       userHash: Long,
+                       tstamp: Long,
+                       pageNamespace: Long,
+                       pageId: Long
+                     )
+  def sameSession(v1: PageView, v2: PageView, timeoutMillis: Long) : Boolean = 
+    (v1.wikiDb == v2.wikiDb) &&
+      (v1.userHash == v2.userHash) &&
+      (Math.abs(v1.tstamp - v2.tstamp) <= timeoutMillis)
+  }
+  /**
+    * Given components associated with a distinct user , return a long hash
+    * @param client_ip
+    * @param user_agent
+    * @param x_forwarded_for
+    * @return
+    */
+  def userHash(
+               client_ip: String,
+               user_agent: String,
+               x_forwarded_for: String
+             ) : Long = {
+    var s = client_ip + ":" + user_agent + ":" +  x_forwarded_for
+    (MurmurHash3.stringHash(s).asInstanceOf[Long] << 32) |
+      (MurmurHash3.stringHash(s.reverse).asInstanceOf[Long] & 0xFFFFFFFL)
+  }
+  /**
+    * Steps:
+    *  - prepare raw tables:
+    *    - page: Keep only wiki_db, page_id, page_namespace, page_is_redirect,
+    *            page_title. Insert fake values used later
+    *    - redirect Join with page (from and to) to clean and denormalize
+    */
+  def listToSQLInCondition(list: Seq[String]): String = => 
s"'$w'").mkString(", ")
+  /**
+    * Prepare a map linking project hostnames to project dbnames.
+    *
+    * @return A map[hostname -> dbname]
+    */
+  def prepareProjectToWikiMap(
+                               sqlContext: SQLContext,
+                               projectNamespaceTable: String,
+                               snapshot: String,
+                               wikiList: Seq[String]
+                             ): Map[String, String] = {
+    sqlContext.sql(
+      s"""
+         |  hostname,
+         |  dbname
+         |FROM $projectNamespaceTable
+         |WHERE snapshot = '$snapshot'
+         |  AND dbname IN (${listToSQLInCondition(wikiList)})
+        """.stripMargin).
+      rdd.
+      collect.
+      map(r => r.getString(0) -> r.getString(1)).toMap
+  }
+  /**
+    * Prepare the pages dataset to be reused
+    *
+    * @return A RDD of PageInfo data augmented with fake pages for each wiki
+    */
+  def preparePages(
+                    sqlContext: SQLContext,
+                    pageTable: String,
+                    snapshot: String,
+                    wikiList: Seq[String]
+                  ): RDD[PageInfo] = {
+    sqlContext.sql(
+      s"""
+                      |SELECT
+                      |  wiki_db,
+                      |  page_id,
+                      |  page_title,
+                      |  page_namespace,
+                      |  page_is_redirect
+                      |FROM $pageTable
+                      |WHERE snapshot = '$snapshot'
+                      |  AND wiki_db IN (${listToSQLInCondition(wikiList)})
+                      |  AND page_id IS NOT NULL
+                      |  AND page_namespace = 0
+                      |  AND page_id > 0
+                      |GROUP BY
+                      |  wiki_db,
+                      |  page_id,
+                      |  page_title,
+                      |  page_namespace,
+                      |  page_is_redirect
+        """.stripMargin).rdd.
+      map(r => {
+        PageInfo(r.getString(0), r.getLong(1), r.getString(2), r.getLong(3), 
r.getBoolean(4), r.getLong(1))
+      }). // insert rows for our special prev pages this will let us work with 
+      // instead of titles later, which is much less error prone
+      union(
+      sqlContext.sparkContext.parallelize(wikiList.flatMap(wiki => Seq(
+        PageInfo(wiki, -1L, "other-empty", 0, pageIsRedirect = false, 
effectivePageId = -1),
+        PageInfo(wiki, -2L, "other-internal",0, pageIsRedirect = false, 
effectivePageId = -1),
+        PageInfo(wiki, -3L, "other-external",0, pageIsRedirect = false, 
effectivePageId = -1),
+        PageInfo(wiki, -4L, "other-search",0, pageIsRedirect = false, 
effectivePageId = -1),
+        PageInfo(wiki, -5L, "other-other",0, pageIsRedirect = false, 
effectivePageId = -1))))
+    )
+  }
+  /**
+    * Prepare the redirects dataset to be reused
+    *
+    * @return A RDD of redirect data
+    */
+  def prepareRedirects(
+                        sqlContext: SQLContext,
+                        redirectTable: String,
+                        snapshot: String,
+                        wikiList: Seq[String],
+                        pages: RDD[PageInfo]
+                      ): RDD[Redirect] = {
+    val pagesPerPageId = => ((p.wikiDb, p.pageId), 
+    val pagesPerTitleAndNamespace = => ((p.wikiDb, p.pageTitle, 
p.pageNamespace), p.pageId)).cache()
+    sqlContext.sql(
+      s"""
+         |SELECT
+         |  wiki_db,
+         |  rd_from,
+         |  rd_title,
+         |  rd_namespace
+         |FROM $redirectTable
+         |WHERE snapshot = '$snapshot'
+         |  AND wiki_db IN (${listToSQLInCondition(wikiList)})
+         |  AND rd_from IS NOT NULL
+         |  AND rd_from > 0
+         |GROUP BY
+         |  wiki_db,
+         |  rd_from,
+         |  rd_title,
+         |  rd_namespace
+          """.stripMargin)
+        .rdd
+        .map(r => {
+          ((r.getString(0), r.getLong(1)), (r.getString(2), r.getLong(3)))
+        })
+        .filter(t => Option(t._2._1).isDefined)   // Remove null toPageTitle
+        .join(pagesPerPageId)                     // Ensure fromPageId exists 
in page table
+        .map { case ((wiki, fromPageId), ((toPageTitle, toPageNamespace), _)) 
+                  ((wiki, toPageTitle, toPageNamespace), fromPageId) }
+        .join(pagesPerTitleAndNamespace)          // Get toPageId from page 
+        .map { case ((wiki, _ , _), (fromPageId, toPageId)) =>
+                  Redirect(wiki, fromPageId, toPageId) }
+        .distinct // prevent any duplicate (data corruption)
+  }
+  /**
+    * Replace page ids for pages that are redirected.
+    *
+    * @return PageInfo with redirects applied to relevant page ids
+    */
+  def resolveRedirects(
+                      sqlContext: SQLContext,
+                      pages: RDD[PageInfo],
+                      redirects: RDD[Redirect]
+                    ) : RDD[PageInfo] = {
+    var redirectsBySrc = => ((r.wikiDb, r.fromPageId), 
+    // This function maps redirects once
+    val processRedirects = (pages: RDD[PageInfo]) =>  {
+                                         p => ((p.wikiDb, 
p.effectivePageId), p) )
+                                                    .map { case (_, (p, 
redirectId)) =>
+                                                      PageInfo(p.wikiDb,
+                                                                p.pageId,
+                                                                p.pageTitle, 
+                                                    }
+                                                } : RDD[PageInfo]
+    // We only process two-levels of redirects. Is this okay?
+    processRedirects(
+      processRedirects(pages))
+  }
+  /**
+    * Prepare the view data
+    * @return A RDD of page views
+    */
+  def prepareViews(
+                          sqlContext: SQLContext,
+                          webrequestTable: String,
+                          year: Int,
+                          month: Int,
+                          day: Option[Int],
+                          hour: Option[Int],
+                          projectList: Seq[String],
+                          projectToWikiMap: Map[String, String],
+                          pages: RDD[PageInfo]
+                        ): RDD[PageView] = {
+    val pagesPerTitles = pages
+      .map(p => ((p.wikiDb, p.pageTitle), (p.pageNamespace, p.pageId))) // 
((wiki, pageTitle), (pageNamespace, pageId))
+      .cache()
+    sqlContext.sql(
+      s"""
+         |SELECT
+         |  CONCAT(pageview_info['project'], '.org') AS project,
+         |  pageview_info['page_title'] as title,
+         |  ts,
+         |  client_ip,
+         |  user_agent,
+         |  x_forwarded_for
+         |FROM $webrequestTable
+         |WHERE webrequest_source = 'text'
+         |  AND year = $year AND month = $month
+         |  ${ => s"AND day = $d").getOrElse("")}
+         |  ${ => s"AND hour = $h").getOrElse("")}
+         |  AND pageview_info['project'] IN 
+         |  AND is_pageview
+         |  AND agent_type = 'user'
+          """.stripMargin)
+      .rdd // ((wiki, fromTitle), (ts, ip, user_agent, x_forward_for)) out of 
+      .map(r => {
+                  val wiki = projectToWikiMap(r.getString(0))
+                  val uh = userHash(r.getString(3), r.getString(4), 
+                  ((wiki, r.getString(1)), (r.getTimestamp(2), uh)) //
+            })
+      .join(pagesPerTitles)
+      .map { case ((wiki, title), ((ts, userHash), (pageNs, pageId))) =>
+        PageView(wiki, userHash, ts.getTime, pageNs, pageId)
+      }
+  }
+  /**
+    * Aggregates page views by user / wiki and groups them into sessions.
+    * The iterator is necessary because there could be millions / billions
+    * of page views on a partition so we cannot load the result set in memory.
+    *
+    * @return
+    */
+  def createSessions(views: RDD[PageView], timeoutInMillis: Int): 
RDD[Seq[PageView]] = {
+    views
+      .map { v => (SessionKey(v.wikiDb, v.userHash, v.tstamp), v) }
+        .repartitionAndSortWithinPartitions(new SessionPartitioner(20))
+        .mapPartitions {
+          iter =>
+            new Iterator[Seq[PageView]] {
+              var eos: Boolean = !iter.hasNext
+              var session = new ArrayBuffer[PageView]()
+              var nextSessionHead : Option[PageView] = None
+              override def hasNext: Boolean = {
+                tryToFillSession()
+                session.nonEmpty
+              }
+              override def next: Seq[PageView] = {
+                tryToFillSession()
+                assert(session.nonEmpty)
+                // Create a new empty session and place the next session head 
into it
+                var s = session
+                session = new ArrayBuffer[PageView]()
+                if (nextSessionHead.isDefined) {
+                  session += nextSessionHead.get
+                  nextSessionHead = None
+                }
+                s
+              }
+              /**
+                * Recursively adds the next page view onto the current session 
+                * we are certain it belongs to the next session. The sign this 
has occurred
+                * is nextSessionHead is non-empty
+                */
+              def tryToFillSession(): Unit = {
+                if (!eos && nextSessionHead.isEmpty && iter.hasNext) {
+                  var p =
+                  if (session.isEmpty || sameSession(p, session.last, 
timeoutInMillis)) {
+                    session += p
+                    tryToFillSession()
+                  } else {
+                    nextSessionHead = Some(p)
+                  }
+                }
+              }
+            }
+        }
+  }
+  /**
+    * Write sessions to output files.
+    * TODO: Overwrite existing files.
+    */
+  def writeSessions(sessions: RDD[Seq[PageView]], outputBasePath: String, 
outputFilesParts: Int): Unit = {
+    sessions
+      .repartition(outputFilesParts)
+      .filter(views => views.nonEmpty)
+      .map(views => views.head.wikiDb + "\t" +
+                    new java.sql.Timestamp(views.head.tstamp) + "\t" +
+           => v.pageId.toString).mkString("\t"))
+      .saveAsTextFile(outputBasePath)
+  }
+  /**
+    * Config class for CLI argument parser using scopt
+    */
+  case class Params(
+                     outputBasePath: String = "/user/shiladsen/sessions",
+                     projectNamespaceTable: String = 
+                     pageTable: String = "wmf_raw.mediawiki_page",
+                     redirectTable: String = "wmf_raw.mediawiki_redirect",
+                     webrequestTable: String = "wmf.webrequest",
+                     wikiList: Seq[String] = Seq("simplewiki"),
+                     outputFilesParts: Int = 1,
+                     snapshot: String = "", // Parameter required, never used 
as is
+                     year: Int = 0, // Parameter required, never used as is
+                     month: Int = 0, // Parameter required, never used as is
+                     day: Option[Int] = None,
+                     hour: Option[Int] = None,
+                     sessionTimeoutMillis: Int = 15*60*1000
+                   )
+  /**
+    * Define the command line options parser
+    */
+  val argsParser = new OptionParser[Params]("Clickstream dataset builder") {
+    head("Clickstream dataset builder", "")
+    note(
+      """
+        |This job computes a clickstream dataset from one or more wiki(s).
+        |It creates a date folder, and per-wiki folders to store the results.
+      """.stripMargin)
+    help("help") text ("Prints this usage text")
+    opt[String]('s', "snapshot") required() valueName ("<snapshot>") action { 
(x, p) =>
+      p.copy(snapshot = x)
+    } text ("The mediawiki hadoop snapshot to use for page, links and 
redirects (usually YYYY-MM)")
+    opt[Int]('y', "year") required() valueName ("<year>") action { (x, p) =>
+      p.copy(year = x)
+    } text ("The year to use for webrequest data gathering.")
+    opt[Int]('m', "month") required() valueName ("<month>") action { (x, p) =>
+      p.copy(month = x)
+    } validate { x => if (x > 0 & x <= 12) success else failure("Invalid 
+    } text ("The month to use for webrequest data gathering.")
+    opt[Int]('d', "day") optional() valueName ("<day>") action { (x, p) =>
+      p.copy(day = Some(x))
+    }  validate { x => if (x > 0 & x <= 31) success else failure("Invalid day")
+    } text ("The day to use for webrequest data gathering (default to empty, 
for monthly computation).")
+    opt[Int]('h', "hour") optional() valueName ("<hour>") action { (x, p) =>
+      p.copy(hour = Some(x))
+    } validate { x => if (x >= 0 & x < 24 ) success else failure("Invalid 
+    } text ("The hour to use for webrequest data gathering (default to empty, 
for daily or monthly computation).")
+    opt[Int]('t', "timeout") optional() valueName ("<timeout>") action { (x, 
p) =>
+      p.copy(hour = Some(x))
+    } validate { x => if (x >= 0) success else failure("Invalid timeout")
+    } text ("The timeout in milliseconds that distinguishes user sessions.")
+    opt[String]('o', "output-base-path") optional() valueName ("<path>") 
action { (x, p) =>
+      p.copy(outputBasePath = if (x.endsWith("/")) x.dropRight(1) else x)
+    } text ("Where on HDFS to store the computed dataset (date folder created 
for you). Defaults to hdfs://analytics-hadoop/wmf/data/archive/clickstream")
+    opt[String]('w', "wikis") optional() valueName 
"<wiki_db_1>,<wiki_db_2>..." action { (x, p) =>
+      p.copy(wikiList = x.split(",").map(_.toLowerCase))
+    } validate { x =>
+      val dbs = x.split(",").map(_.toLowerCase)
+      if (dbs.filter(db => db.isEmpty || (! db.contains("wik"))).length > 0)
+        failure("Invalid wikis list")
+      else
+        success
+    } text "wiki dbs to compute. Defaults to enwiki"
+    opt[Int]('p', "output-files-parts") optional() valueName ("<partitions>") 
action { (x, p) =>
+      p.copy(outputFilesParts = x)
+    } text ("Number of file parts to output in hdfs. Defaults to 1")
+    opt[String]("project-namespace-table") optional() valueName ("<table>") 
action { (x, p) =>
+      p.copy(projectNamespaceTable = x)
+    } text ("Fully qualified name of the project-namespace table on Hive. 
Default to wmf_raw.mediawiki_project_namespace_map")
+    opt[String]("page-table") optional() valueName ("<table>") action { (x, p) 
+      p.copy(pageTable = x)
+    } text ("Fully qualified name of the page table on Hive. Default to 
+    opt[String]("redirect-table") optional() valueName ("<table>") action { 
(x, p) =>
+      p.copy(redirectTable = x)
+    } text ("Fully qualified name of the redirect table on Hive. Default to 
+    opt[String]("webrequest-table") optional() valueName ("<table>") action { 
(x, p) =>
+      p.copy(webrequestTable = x)
+    } text ("Fully qualified name of the webrequest table on Hive. Default to 
+  }
+  def main(args: Array[String]): Unit = {
+    val params = args.headOption match {
+      // Case when our job options are given as a single string.  Split them
+      // and pass them to argsParser.
+      case Some("--options") =>
+        argsParser.parse(args(1).split("\\s+"), 
+      // Else the normal usage, each CLI opts can be parsed as a job option.
+      case _ =>
+        argsParser.parse(args, Params()).getOrElse(sys.exit(1))
+    }
+    // Exit non-zero if if any refinements failed.
+    apply(params)
+  }
+  def apply(params: Params): Unit = {
+    val conf = new SparkConf()
+      .setAppName(s"ClickStreamBuilder")
+      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+      .registerKryoClasses(Array(
+        classOf[PageInfo],
+        classOf[Redirect],
+        classOf[PageView]))
+      .set("spark.hadoop.mapred.output.compress", "true")
+      .set("spark.hadoop.mapred.output.compression.codec", "true")
+      .set("spark.hadoop.mapred.output.compression.codec", 
+      .set("spark.hadoop.mapred.output.compression.type", "BLOCK")
+    val sqlContext = new HiveContext(new SparkContext(conf))
+    LogManager.getRootLogger.setLevel(Level.WARN)
+    sqlContext.sparkContext.parallelize(Seq("")).foreachPartition(x => {
+      import org.apache.log4j.{LogManager, Level}
+      LogManager.getRootLogger.setLevel(Level.WARN)
+    })
+    import sqlContext.implicits._
+    val projectToWikiMap = prepareProjectToWikiMap(sqlContext, 
params.projectNamespaceTable, params.snapshot, params.wikiList)
+    val domainList = projectToWikiMap.keys.toList
+    val projectList =".org"))
+    val outputFolder = "/user/shilad/wmf/data/archive/clickstream/" +
+      f"${params.year}%04d-${params.month}%02d${ => 
f"-$d%02d").getOrElse("")}${ => f"-$h%02d").getOrElse("")}"
+    // Reused RDDs
+    val pages = preparePages(sqlContext, params.pageTable, params.snapshot, 
+    val redirects = prepareRedirects(sqlContext, params.redirectTable, 
params.snapshot, params.wikiList, pages)
+    val pages2 = resolveRedirects(sqlContext, pages, redirects)
+    val views = prepareViews(
+      sqlContext, params.webrequestTable,
+      params.year, params.month,, params.hour,
+      projectList, projectToWikiMap,
+      pages)
+    val sessions = createSessions(views, params.sessionTimeoutMillis)
+    writeSessions(sessions, params.outputBasePath, params.outputFilesParts)
+  }

