Shilad Sen has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/377706 )

Change subject: Spark job to create session event log appears to be working.
......................................................................

Spark job to create session event log appears to be working.

Session event log is one session per line, with a list of page ids
viewed in that session ordered by tstamp.

simplewiki      2017-06-08 03:28:15.0   34242   9700
simplewiki      2017-06-08 03:14:49.0   31246
simplewiki      2017-06-08 03:14:22.0   34019   9084    81247   121

TODO:
* Benchmark on larger datasets
* Optimize if necessary (try broadcasting datastructure to avoid join)
* Compare data cleaning with Elery's scripts

Bug: T174796
Change-Id: I52103578d50d492eaeff2eeffc8629331a1260da
---
A 
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
1 file changed, 547 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source 
refs/changes/06/377706/1

diff --git 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
new file mode 100644
index 0000000..d77a96a
--- /dev/null
+++ 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
@@ -0,0 +1,547 @@
+package org.wikimedia.analytics.refinery.job.vectors
+
+import org.apache.log4j.{Level, LogManager}
+import org.apache.spark.Partitioner
+
+import scala.collection.mutable.ArrayBuffer
+import scala.util.hashing.MurmurHash3
+
+/**
+  * Process to create a page view session log.
+  *
+  * This create output files with one line per session.
+  * The first two tokens of each line are the session start timestamp and 
language edition.
+  * The following tokens are the page ids viewed in the session, in order.
+  **
+  * @author Shilad Sen
+  */
+object SessionPagesBuilder {
+
+
+  import org.apache.spark.sql.hive.HiveContext
+  import org.apache.spark.SparkConf
+  import org.apache.spark.sql.SaveMode
+  import org.apache.spark.SparkContext
+  import scopt.OptionParser
+  import org.apache.spark.rdd.RDD
+  import org.apache.spark.sql.SQLContext
+
+
+  case class SessionKey (wikiDb: String, userHash: Long, tstamp: Long)
+    extends Ordered[SessionKey] {
+    override def compare(that: SessionKey): Int = {
+      import scala.math.Ordered.orderingToOrdered
+
+      Ordering.Tuple3(Ordering.String, Ordering.Long, Ordering.Long).compare(
+        (wikiDb, userHash, tstamp),
+        (that.wikiDb, that.userHash, that.tstamp)
+      )
+    }
+  }
+
+
+  class SessionPartitioner(partitions: Int) extends Partitioner {
+    require(partitions >= 0, s"Number of partitions ($partitions) cannot be 
negative.")
+
+    override def numPartitions: Int = partitions
+
+    override def getPartition(key: Any): Int = {
+      val k = key.asInstanceOf[SessionKey]
+      Math.abs(k.wikiDb.hashCode() + k.userHash.asInstanceOf[Int]) % 
numPartitions
+    }
+  }
+
+  object SessionKey {
+    implicit def orderingByTimestamp[A <: SessionKey] : Ordering[A] = {
+      Ordering.by(fk => (fk.wikiDb, fk.userHash, fk.tstamp))
+    }
+  }
+
+
+
+  case class PageInfo(
+                       wikiDb: String,
+                       pageId: Long,
+                       pageTitle: String,
+                       pageNamespace: Long,
+                       pageIsRedirect: Boolean,
+                       effectivePageId : Long    // page id or redirected 
destination page id
+                     )
+
+  case class Redirect(
+                       wikiDb: String,
+                       fromPageId: Long,
+                       toPageId: Long
+                     )
+
+  case class PageView(
+                       wikiDb: String,
+                       userHash: Long,
+                       tstamp: Long,
+                       pageNamespace: Long,
+                       pageId: Long
+                     )
+
+  def sameSession(v1: PageView, v2: PageView, timeoutMillis: Long) : Boolean = 
{
+    (v1.wikiDb == v2.wikiDb) &&
+      (v1.userHash == v2.userHash) &&
+      (Math.abs(v1.tstamp - v2.tstamp) <= timeoutMillis)
+  }
+
+
+  /**
+    * Given components associated with a distinct user , return a long hash
+    * @param client_ip
+    * @param user_agent
+    * @param x_forwarded_for
+    * @return
+    */
+  def userHash(
+               client_ip: String,
+               user_agent: String,
+               x_forwarded_for: String
+             ) : Long = {
+    var s = client_ip + ":" + user_agent + ":" +  x_forwarded_for
+    (MurmurHash3.stringHash(s).asInstanceOf[Long] << 32) |
+      (MurmurHash3.stringHash(s.reverse).asInstanceOf[Long] & 0xFFFFFFFL)
+  }
+
+  /**
+    * Steps:
+    *  - prepare raw tables:
+    *    - page: Keep only wiki_db, page_id, page_namespace, page_is_redirect,
+    *            page_title. Insert fake values used later
+    *    - redirect Join with page (from and to) to clean and denormalize
+    */
+
+
+  def listToSQLInCondition(list: Seq[String]): String = list.map(w => 
s"'$w'").mkString(", ")
+
+  /**
+    * Prepare a map linking project hostnames to project dbnames.
+    *
+    * @return A map[hostname -> dbname]
+    */
+  def prepareProjectToWikiMap(
+                               sqlContext: SQLContext,
+                               projectNamespaceTable: String,
+                               snapshot: String,
+                               wikiList: Seq[String]
+                             ): Map[String, String] = {
+    sqlContext.sql(
+      s"""
+         |SELECT DISTINCT
+         |  hostname,
+         |  dbname
+         |FROM $projectNamespaceTable
+         |WHERE snapshot = '$snapshot'
+         |  AND dbname IN (${listToSQLInCondition(wikiList)})
+        """.stripMargin).
+      rdd.
+      collect.
+      map(r => r.getString(0) -> r.getString(1)).toMap
+  }
+
+  /**
+    * Prepare the pages dataset to be reused
+    *
+    * @return A RDD of PageInfo data augmented with fake pages for each wiki
+    */
+  def preparePages(
+                    sqlContext: SQLContext,
+                    pageTable: String,
+                    snapshot: String,
+                    wikiList: Seq[String]
+                  ): RDD[PageInfo] = {
+    sqlContext.sql(
+      s"""
+                      |SELECT
+                      |  wiki_db,
+                      |  page_id,
+                      |  page_title,
+                      |  page_namespace,
+                      |  page_is_redirect
+                      |FROM $pageTable
+                      |WHERE snapshot = '$snapshot'
+                      |  AND wiki_db IN (${listToSQLInCondition(wikiList)})
+                      |  AND page_id IS NOT NULL
+                      |  AND page_namespace = 0
+                      |  AND page_id > 0
+                      |GROUP BY
+                      |  wiki_db,
+                      |  page_id,
+                      |  page_title,
+                      |  page_namespace,
+                      |  page_is_redirect
+        """.stripMargin).rdd.
+      map(r => {
+        PageInfo(r.getString(0), r.getLong(1), r.getString(2), r.getLong(3), 
r.getBoolean(4), r.getLong(1))
+      }). // insert rows for our special prev pages this will let us work with 
ids
+      // instead of titles later, which is much less error prone
+      union(
+      sqlContext.sparkContext.parallelize(wikiList.flatMap(wiki => Seq(
+        PageInfo(wiki, -1L, "other-empty", 0, pageIsRedirect = false, 
effectivePageId = -1),
+        PageInfo(wiki, -2L, "other-internal",0, pageIsRedirect = false, 
effectivePageId = -1),
+        PageInfo(wiki, -3L, "other-external",0, pageIsRedirect = false, 
effectivePageId = -1),
+        PageInfo(wiki, -4L, "other-search",0, pageIsRedirect = false, 
effectivePageId = -1),
+        PageInfo(wiki, -5L, "other-other",0, pageIsRedirect = false, 
effectivePageId = -1))))
+    )
+  }
+
+  /**
+    * Prepare the redirects dataset to be reused
+    *
+    * @return A RDD of redirect data
+    */
+  def prepareRedirects(
+                        sqlContext: SQLContext,
+                        redirectTable: String,
+                        snapshot: String,
+                        wikiList: Seq[String],
+                        pages: RDD[PageInfo]
+                      ): RDD[Redirect] = {
+
+    val pagesPerPageId = pages.map(p => ((p.wikiDb, p.pageId), 
p.pageTitle)).cache()
+    val pagesPerTitleAndNamespace = pages.map(p => ((p.wikiDb, p.pageTitle, 
p.pageNamespace), p.pageId)).cache()
+
+    sqlContext.sql(
+      s"""
+         |SELECT
+         |  wiki_db,
+         |  rd_from,
+         |  rd_title,
+         |  rd_namespace
+         |FROM $redirectTable
+         |WHERE snapshot = '$snapshot'
+         |  AND wiki_db IN (${listToSQLInCondition(wikiList)})
+         |  AND rd_from IS NOT NULL
+         |  AND rd_from > 0
+         |GROUP BY
+         |  wiki_db,
+         |  rd_from,
+         |  rd_title,
+         |  rd_namespace
+          """.stripMargin)
+        .rdd
+        .map(r => {
+          ((r.getString(0), r.getLong(1)), (r.getString(2), r.getLong(3)))
+        })
+        .filter(t => Option(t._2._1).isDefined)   // Remove null toPageTitle
+        .join(pagesPerPageId)                     // Ensure fromPageId exists 
in page table
+        .map { case ((wiki, fromPageId), ((toPageTitle, toPageNamespace), _)) 
=>
+                  ((wiki, toPageTitle, toPageNamespace), fromPageId) }
+        .join(pagesPerTitleAndNamespace)          // Get toPageId from page 
table
+        .map { case ((wiki, _ , _), (fromPageId, toPageId)) =>
+                  Redirect(wiki, fromPageId, toPageId) }
+        .distinct // prevent any duplicate (data corruption)
+  }
+
+  /**
+    * Replace page ids for pages that are redirected.
+    *
+    * @return PageInfo with redirects applied to relevant page ids
+    */
+  def resolveRedirects(
+                      sqlContext: SQLContext,
+                      pages: RDD[PageInfo],
+                      redirects: RDD[Redirect]
+                    ) : RDD[PageInfo] = {
+
+    var redirectsBySrc = redirects.map(r => ((r.wikiDb, r.fromPageId), 
r.toPageId)).cache()
+
+    // This function maps redirects once
+    val processRedirects = (pages: RDD[PageInfo]) =>  {
+                                                  pages.map( p => ((p.wikiDb, 
p.effectivePageId), p) )
+                                                    
.leftOuterJoin(redirectsBySrc)
+                                                    .map { case (_, (p, 
redirectId)) =>
+                                                      PageInfo(p.wikiDb,
+                                                                p.pageId,
+                                                                p.pageTitle, 
p.pageNamespace,
+                                                                
p.pageIsRedirect,
+                                                                
redirectId.getOrElse(p.effectivePageId))
+                                                    }
+                                                } : RDD[PageInfo]
+
+    // We only process two-levels of redirects. Is this okay?
+    processRedirects(
+      processRedirects(pages))
+  }
+
+
+  /**
+    * Prepare the view data
+    * @return A RDD of page views
+    */
+  def prepareViews(
+                          sqlContext: SQLContext,
+                          webrequestTable: String,
+                          year: Int,
+                          month: Int,
+                          day: Option[Int],
+                          hour: Option[Int],
+                          projectList: Seq[String],
+                          projectToWikiMap: Map[String, String],
+                          pages: RDD[PageInfo]
+                        ): RDD[PageView] = {
+    val pagesPerTitles = pages
+      .map(p => ((p.wikiDb, p.pageTitle), (p.pageNamespace, p.pageId))) // 
((wiki, pageTitle), (pageNamespace, pageId))
+      .cache()
+
+    sqlContext.sql(
+      s"""
+         |SELECT
+         |  CONCAT(pageview_info['project'], '.org') AS project,
+         |  pageview_info['page_title'] as title,
+         |  ts,
+         |  client_ip,
+         |  user_agent,
+         |  x_forwarded_for
+         |FROM $webrequestTable
+         |WHERE webrequest_source = 'text'
+         |  AND year = $year AND month = $month
+         |  ${day.map(d => s"AND day = $d").getOrElse("")}
+         |  ${hour.map(h => s"AND hour = $h").getOrElse("")}
+         |  AND pageview_info['project'] IN 
(${listToSQLInCondition(projectList)})
+         |  AND is_pageview
+         |  AND agent_type = 'user'
+          """.stripMargin)
+      .rdd // ((wiki, fromTitle), (ts, ip, user_agent, x_forward_for)) out of 
webrequest
+      .map(r => {
+                  val wiki = projectToWikiMap(r.getString(0))
+                  val uh = userHash(r.getString(3), r.getString(4), 
r.getString(5))
+                  ((wiki, r.getString(1)), (r.getTimestamp(2), uh)) //
+            })
+      .join(pagesPerTitles)
+      .map { case ((wiki, title), ((ts, userHash), (pageNs, pageId))) =>
+        PageView(wiki, userHash, ts.getTime, pageNs, pageId)
+      }
+  }
+
+  /**
+    * Aggregates page views by user / wiki and groups them into sessions.
+    * The iterator is necessary because there could be millions / billions
+    * of page views on a partition so we cannot load the result set in memory.
+    *
+    * @return
+    */
+  def createSessions(views: RDD[PageView], timeoutInMillis: Int): 
RDD[Seq[PageView]] = {
+    views
+      .map { v => (SessionKey(v.wikiDb, v.userHash, v.tstamp), v) }
+        .repartitionAndSortWithinPartitions(new SessionPartitioner(20))
+        .mapPartitions {
+          iter =>
+            new Iterator[Seq[PageView]] {
+              var eos: Boolean = !iter.hasNext
+              var session = new ArrayBuffer[PageView]()
+              var nextSessionHead : Option[PageView] = None
+
+              override def hasNext: Boolean = {
+                tryToFillSession()
+                session.nonEmpty
+              }
+
+              override def next: Seq[PageView] = {
+                tryToFillSession()
+                assert(session.nonEmpty)
+
+                // Create a new empty session and place the next session head 
into it
+                var s = session
+                session = new ArrayBuffer[PageView]()
+                if (nextSessionHead.isDefined) {
+                  session += nextSessionHead.get
+                  nextSessionHead = None
+                }
+                s
+              }
+
+              /**
+                * Recursively adds the next page view onto the current session 
until
+                * we are certain it belongs to the next session. The sign this 
has occurred
+                * is nextSessionHead is non-empty
+                */
+              def tryToFillSession(): Unit = {
+                if (!eos && nextSessionHead.isEmpty && iter.hasNext) {
+                  var p = iter.next._2
+                  if (session.isEmpty || sameSession(p, session.last, 
timeoutInMillis)) {
+                    session += p
+                    tryToFillSession()
+                  } else {
+                    nextSessionHead = Some(p)
+                  }
+                }
+              }
+            }
+        }
+  }
+
+
+  /**
+    * Write sessions to output files.
+    * TODO: Overwrite existing files.
+    */
+  def writeSessions(sessions: RDD[Seq[PageView]], outputBasePath: String, 
outputFilesParts: Int): Unit = {
+    sessions
+      .repartition(outputFilesParts)
+      .filter(views => views.nonEmpty)
+      .map(views => views.head.wikiDb + "\t" +
+                    new java.sql.Timestamp(views.head.tstamp) + "\t" +
+                    views.map(v => v.pageId.toString).mkString("\t"))
+      .saveAsTextFile(outputBasePath)
+  }
+
+  /**
+    * Config class for CLI argument parser using scopt
+    */
+  case class Params(
+                     outputBasePath: String = "/user/shiladsen/sessions",
+                     projectNamespaceTable: String = 
"wmf_raw.mediawiki_project_namespace_map",
+                     pageTable: String = "wmf_raw.mediawiki_page",
+                     redirectTable: String = "wmf_raw.mediawiki_redirect",
+                     webrequestTable: String = "wmf.webrequest",
+                     wikiList: Seq[String] = Seq("simplewiki"),
+                     outputFilesParts: Int = 1,
+                     snapshot: String = "", // Parameter required, never used 
as is
+                     year: Int = 0, // Parameter required, never used as is
+                     month: Int = 0, // Parameter required, never used as is
+                     day: Option[Int] = None,
+                     hour: Option[Int] = None,
+                     sessionTimeoutMillis: Int = 15*60*1000
+                   )
+
+  /**
+    * Define the command line options parser
+    */
+  val argsParser = new OptionParser[Params]("Clickstream dataset builder") {
+    head("Clickstream dataset builder", "")
+    note(
+      """
+        |This job computes a clickstream dataset from one or more wiki(s).
+        |It creates a date folder, and per-wiki folders to store the results.
+      """.stripMargin)
+    help("help") text ("Prints this usage text")
+
+    opt[String]('s', "snapshot") required() valueName ("<snapshot>") action { 
(x, p) =>
+      p.copy(snapshot = x)
+    } text ("The mediawiki hadoop snapshot to use for page, links and 
redirects (usually YYYY-MM)")
+
+    opt[Int]('y', "year") required() valueName ("<year>") action { (x, p) =>
+      p.copy(year = x)
+    } text ("The year to use for webrequest data gathering.")
+
+    opt[Int]('m', "month") required() valueName ("<month>") action { (x, p) =>
+      p.copy(month = x)
+    } validate { x => if (x > 0 & x <= 12) success else failure("Invalid 
month")
+    } text ("The month to use for webrequest data gathering.")
+
+    opt[Int]('d', "day") optional() valueName ("<day>") action { (x, p) =>
+      p.copy(day = Some(x))
+    }  validate { x => if (x > 0 & x <= 31) success else failure("Invalid day")
+    } text ("The day to use for webrequest data gathering (default to empty, 
for monthly computation).")
+
+    opt[Int]('h', "hour") optional() valueName ("<hour>") action { (x, p) =>
+      p.copy(hour = Some(x))
+    } validate { x => if (x >= 0 & x < 24 ) success else failure("Invalid 
hour")
+    } text ("The hour to use for webrequest data gathering (default to empty, 
for daily or monthly computation).")
+
+    opt[Int]('t', "timeout") optional() valueName ("<timeout>") action { (x, 
p) =>
+      p.copy(hour = Some(x))
+    } validate { x => if (x >= 0) success else failure("Invalid timeout")
+    } text ("The timeout in milliseconds that distinguishes user sessions.")
+
+
+    opt[String]('o', "output-base-path") optional() valueName ("<path>") 
action { (x, p) =>
+      p.copy(outputBasePath = if (x.endsWith("/")) x.dropRight(1) else x)
+    } text ("Where on HDFS to store the computed dataset (date folder created 
for you). Defaults to hdfs://analytics-hadoop/wmf/data/archive/clickstream")
+
+    opt[String]('w', "wikis") optional() valueName 
"<wiki_db_1>,<wiki_db_2>..." action { (x, p) =>
+      p.copy(wikiList = x.split(",").map(_.toLowerCase))
+    } validate { x =>
+      val dbs = x.split(",").map(_.toLowerCase)
+      if (dbs.filter(db => db.isEmpty || (! db.contains("wik"))).length > 0)
+        failure("Invalid wikis list")
+      else
+        success
+    } text "wiki dbs to compute. Defaults to enwiki"
+
+    opt[Int]('p', "output-files-parts") optional() valueName ("<partitions>") 
action { (x, p) =>
+      p.copy(outputFilesParts = x)
+    } text ("Number of file parts to output in hdfs. Defaults to 1")
+
+    opt[String]("project-namespace-table") optional() valueName ("<table>") 
action { (x, p) =>
+      p.copy(projectNamespaceTable = x)
+    } text ("Fully qualified name of the project-namespace table on Hive. 
Default to wmf_raw.mediawiki_project_namespace_map")
+
+    opt[String]("page-table") optional() valueName ("<table>") action { (x, p) 
=>
+      p.copy(pageTable = x)
+    } text ("Fully qualified name of the page table on Hive. Default to 
wmf_raw.mediawiki_page")
+
+    opt[String]("redirect-table") optional() valueName ("<table>") action { 
(x, p) =>
+      p.copy(redirectTable = x)
+    } text ("Fully qualified name of the redirect table on Hive. Default to 
wmf_raw.mediawiki_redirect")
+
+    opt[String]("webrequest-table") optional() valueName ("<table>") action { 
(x, p) =>
+      p.copy(webrequestTable = x)
+    } text ("Fully qualified name of the webrequest table on Hive. Default to 
wmf.webrequest")
+
+  }
+
+
+  def main(args: Array[String]): Unit = {
+    val params = args.headOption match {
+      // Case when our job options are given as a single string.  Split them
+      // and pass them to argsParser.
+      case Some("--options") =>
+        argsParser.parse(args(1).split("\\s+"), 
Params()).getOrElse(sys.exit(1))
+      // Else the normal usage, each CLI opts can be parsed as a job option.
+      case _ =>
+        argsParser.parse(args, Params()).getOrElse(sys.exit(1))
+    }
+
+    // Exit non-zero if if any refinements failed.
+    apply(params)
+  }
+
+  def apply(params: Params): Unit = {
+
+    val conf = new SparkConf()
+      .setAppName(s"ClickStreamBuilder")
+      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+      .registerKryoClasses(Array(
+        classOf[PageInfo],
+        classOf[Redirect],
+        classOf[PageView]))
+      .set("spark.hadoop.mapred.output.compress", "true")
+      .set("spark.hadoop.mapred.output.compression.codec", "true")
+      .set("spark.hadoop.mapred.output.compression.codec", 
"org.apache.hadoop.io.compress.GzipCodec")
+      .set("spark.hadoop.mapred.output.compression.type", "BLOCK")
+
+    val sqlContext = new HiveContext(new SparkContext(conf))
+
+    LogManager.getRootLogger.setLevel(Level.WARN)
+    sqlContext.sparkContext.parallelize(Seq("")).foreachPartition(x => {
+      import org.apache.log4j.{LogManager, Level}
+      LogManager.getRootLogger.setLevel(Level.WARN)
+    })
+
+    import sqlContext.implicits._
+
+    val projectToWikiMap = prepareProjectToWikiMap(sqlContext, 
params.projectNamespaceTable, params.snapshot, params.wikiList)
+    val domainList = projectToWikiMap.keys.toList
+    val projectList = domainList.map(_.stripSuffix(".org"))
+
+    val outputFolder = "/user/shilad/wmf/data/archive/clickstream/" +
+      f"${params.year}%04d-${params.month}%02d${params.day.map(d => 
f"-$d%02d").getOrElse("")}${params.hour.map(h => f"-$h%02d").getOrElse("")}"
+
+    // Reused RDDs
+    val pages = preparePages(sqlContext, params.pageTable, params.snapshot, 
params.wikiList).cache()
+    val redirects = prepareRedirects(sqlContext, params.redirectTable, 
params.snapshot, params.wikiList, pages)
+    val pages2 = resolveRedirects(sqlContext, pages, redirects)
+    val views = prepareViews(
+      sqlContext, params.webrequestTable,
+      params.year, params.month, params.day, params.hour,
+      projectList, projectToWikiMap,
+      pages)
+    val sessions = createSessions(views, params.sessionTimeoutMillis)
+    writeSessions(sessions, params.outputBasePath, params.outputFilesParts)
+  }
+}

-- 
To view, visit https://gerrit.wikimedia.org/r/377706
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I52103578d50d492eaeff2eeffc8629331a1260da
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery/source
Gerrit-Branch: master
Gerrit-Owner: Shilad Sen <s...@macalester.edu>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to