Shilad Sen has uploaded a new change for review. (
https://gerrit.wikimedia.org/r/381517 )
Change subject: Simplified job to create session ids and finished debugging it.
......................................................................
Simplified job to create session ids and finished debugging it.
Changes:
* Instead of calculating page ids from titles, took page ids and NS ids
directly from webrequest table. I think that this works okay in the
scenarios I care about (i.e. desktop page views)
* Builder now passes unit tests.
TODO:
* Added sessions separated by timeout to testing data
* Benchmark new version on production cluster.
* Turn test into proper unit test.
Bug: T174796
Change-Id: Iaec6ee811dd3da30152c3dbb206bf3a505370f90
---
M
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
M
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestDataCreator.scala
M
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestParams.scala
M
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPagesBuilder.scala
4 files changed, 146 insertions(+), 317 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source
refs/changes/17/381517/1
diff --git
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
index 1d03705..6e593b4 100644
---
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
+++
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
@@ -14,8 +14,7 @@
* This create output files with one line per session.
* The first two tokens of each line are the session start timestamp and
language edition.
* The following tokens are the page ids viewed in the session, in order.
- **
- *
+ *
* @author Shilad Sen
*/
object SessionPagesBuilder {
@@ -51,6 +50,11 @@
val LOOK_PAST_SECS: Int = 60 * 59
+ /**
+ * A user session is defined by: A wiki, a hash of unique user information,
+ * and the timestamp for a view. This is used to stream through pageviews
+ * that may have been part of the same user session.
+ */
case class SessionKey (wikiDb: String, userHash: Long, tstamp: Long)
extends Ordered[SessionKey] {
override def compare(that: SessionKey): Int = {
@@ -63,7 +67,9 @@
}
}
-
+ /**
+ * Partitions all views for the same wiki from the same user onto a single
partition.
+ */
class SessionPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be
negative.")
@@ -81,6 +87,11 @@
}
}
+ /**
+ * Wrapper that allows us to handle production hive timestamps (which
+ * are already timestmaps), and testing data from JSON files (which are
+ * strings) uniformly.
+ */
def getTimestamp(ts: Any) : Timestamp = {
ts match {
case _: Timestamp =>
@@ -90,19 +101,6 @@
case _ => throw new IllegalArgumentException(ts.toString)
}
}
-
- /**
- * A single page.
- * Effective page id is the page id after resolving redirects.
- */
- case class PageInfo(
- wikiDb: String,
- pageId: Long,
- pageTitle: String,
- pageNamespace: Long,
- pageIsRedirect: Boolean,
- effectivePageId : Long
- )
/**
* A view of a page by a user.
@@ -117,21 +115,14 @@
pageId: Long
)
- case class Redirect(
- wikiDb: String,
- fromPageId: Long,
- toPageId: Long
- )
-
-
/**
* Returns true iff the two view are part of the same session.
* User hashes must match and the pageviews must fall within a specified
window.
*/
- def sameSession(v1: PageView, v2: PageView, timeoutMillis: Long) : Boolean =
{
+ def sameSession(v1: PageView, v2: PageView, timeoutSecs: Long) : Boolean = {
(v1.wikiDb == v2.wikiDb) &&
(v1.userHash == v2.userHash) &&
- (Math.abs(v1.tstamp - v2.tstamp) <= timeoutMillis)
+ (Math.abs(v1.tstamp - v2.tstamp) <= timeoutSecs * 1000)
}
@@ -179,131 +170,6 @@
map(r => r.getString(0) -> r.getString(1)).toMap
}
- def hoursInRange(begin: org.joda.time.DateTime, end: org.joda.time.DateTime)
: Seq[(Int, Int, Int, Int)]= {
- val beginHour = begin.hourOfDay().roundFloorCopy()
- val endHour = end.hourOfDay().roundCeilingCopy()
- val numHours = Hours.hoursBetween(beginHour, endHour).getHours
- val dates = (0 to numHours).map(beginHour.plusHours)
- dates.map(d => (d.getYear, d.getMonthOfYear, d.getDayOfMonth,
d.getHourOfDay))
- }
-
- /**
- * Prepare the pages dataset to be reused
- *
- * @return A RDD of PageInfo data augmented with fake pages for each wiki
- */
- def preparePages(
- sqlContext: SQLContext,
- pageTable: String,
- snapshot: String,
- wikiList: Seq[String]
- ): RDD[PageInfo] = {
- sqlContext.sql(
- s"""
- |SELECT
- | wiki_db,
- | page_id,
- | page_title,
- | page_namespace,
- | page_is_redirect
- |FROM $pageTable
- |WHERE snapshot = '$snapshot'
- | AND wiki_db IN (${listToSQLInCondition(wikiList)})
- | AND page_id IS NOT NULL
- | AND page_namespace = 0
- | AND page_id > 0
- |GROUP BY
- | wiki_db,
- | page_id,
- | page_title,
- | page_namespace,
- | page_is_redirect
- """.stripMargin).rdd.
- map(r => {
- PageInfo(r.getString(0), r.getLong(1), r.getString(2),
- r.getLong(3), r.getBoolean(4), r.getLong(1))
- })
- }
-
- /**
- * Prepare the redirects dataset to be reused
- *
- * @return A RDD of redirect data
- */
- def prepareRedirects(
- sqlContext: SQLContext,
- redirectTable: String,
- snapshot: String,
- wikiList: Seq[String],
- pages: RDD[PageInfo]
- ): RDD[Redirect] = {
-
- val pagesPerPageId = pages.map(p => ((p.wikiDb, p.pageId),
p.pageTitle)).cache()
- val pagesPerTitleAndNamespace = pages.map(p => ((p.wikiDb, p.pageTitle,
p.pageNamespace), p.pageId)).cache()
-
- sqlContext.sql(
- s"""
- |SELECT
- | wiki_db,
- | rd_from,
- | rd_title,
- | rd_namespace
- |FROM $redirectTable
- |WHERE snapshot = '$snapshot'
- | AND wiki_db IN (${listToSQLInCondition(wikiList)})
- | AND rd_from IS NOT NULL
- | AND rd_from > 0
- |GROUP BY
- | wiki_db,
- | rd_from,
- | rd_title,
- | rd_namespace
- """.stripMargin)
- .rdd
- .map(r => {
- ((r.getString(0), r.getLong(1)), (r.getString(2), r.getLong(3)))
- })
- .filter(t => Option(t._2._1).isDefined) // Remove null toPageTitle
- .join(pagesPerPageId) // Ensure fromPageId exists
in page table
- .map { case ((wiki, fromPageId), ((toPageTitle, toPageNamespace), _))
=>
- ((wiki, toPageTitle, toPageNamespace), fromPageId) }
- .join(pagesPerTitleAndNamespace) // Get toPageId from page
table
- .map { case ((wiki, _ , _), (fromPageId, toPageId)) =>
- Redirect(wiki, fromPageId, toPageId) }
- .distinct // prevent any duplicate (data corruption)
- }
-
- /**
- * Replace page ids for pages that are redirected.
- *
- * @return PageInfo with redirects applied to relevant page ids
- */
- def resolveRedirects(
- sqlContext: SQLContext,
- pages: RDD[PageInfo],
- redirects: RDD[Redirect]
- ) : RDD[PageInfo] = {
-
- var redirectsBySrc = redirects.map(r => ((r.wikiDb, r.fromPageId),
r.toPageId)).cache()
-
- // This function maps redirects once
- val processRedirects = (pages: RDD[PageInfo]) => {
- pages.map( p => ((p.wikiDb, p.effectivePageId),
p) )
- .leftOuterJoin(redirectsBySrc)
- .map { case (_, (p, redirectId)) =>
- PageInfo(p.wikiDb,
- p.pageId,
- p.pageTitle, p.pageNamespace,
- p.pageIsRedirect,
-
redirectId.getOrElse(p.effectivePageId))
- }
- } : RDD[PageInfo]
-
- // We only process two-levels of redirects. Is this okay?
- processRedirects(
- processRedirects(pages))
- }
-
/**
* Prepare the view data
@@ -315,24 +181,23 @@
begin: Timestamp,
end: Timestamp,
projectList: Seq[String],
- projectToWikiMap: Map[String, String],
- pages: RDD[PageInfo]
+ projectToWikiMap: Map[String, String]
): RDD[PageView] = {
- val pagesPerTitles = pages
- .map(p => ((p.wikiDb, p.pageTitle), (p.pageNamespace, p.pageId))) //
((wiki, pageTitle), (pageNamespace, pageId))
- .cache()
sqlContext.sql(
s"""
|SELECT
| CONCAT(pageview_info['project'], '.org') AS project,
- | pageview_info['page_title'] as title,
+ | namespace_id,
+ | page_id,
| ts,
| client_ip,
| user_agent,
| x_forwarded_for
|FROM $webrequestTable
|WHERE webrequest_source = 'text'
+ | AND access_method = 'desktop'
+ | AND normalized_host.project_class IN ('wikipedia', 'wikimedia')
| AND(${PartitionQueryBuilder.formSql(begin, end)})
| AND pageview_info['project'] IN
(${listToSQLInCondition(projectList)})
| AND is_pageview
@@ -340,19 +205,15 @@
""".stripMargin)
.rdd
.map(r => {
- val wiki = projectToWikiMap(r.getString(0))
- val uh = userHash(r.getString(3), r.getString(4),
r.getString(5))
- ((wiki, r.getString(1)), (getTimestamp(r.get(2)), uh))
- })
- .join(pagesPerTitles)
- .map { case ((wiki, title), ((ts, userHash), (pageNs, pageId))) =>
- PageView(wiki, userHash, ts.getTime, pageNs, pageId)
- }
+ val wiki = projectToWikiMap(r.getString(0))
+ val uh = userHash(r.getString(4), r.getString(5), r.getString(6))
+ PageView(wiki, uh, getTimestamp(r.get(3)).getTime, r.getLong(1),
r.getLong(2))
+ })
}
/**
* Aggregates page views by user / wiki and groups them into sessions.
- *
+ *G
* 1. Create session key composed of wiki, userhash, tstamp that delineates
sessions.
* 2. Partition by wiki, userhash with secondary sort on tstamp.
* 3. Reduce page views into sessions on each partitition. The iterator
@@ -361,7 +222,7 @@
*
* @return
*/
- def createSessions(views: RDD[PageView], timeoutInMillis: Int):
RDD[Seq[PageView]] = {
+ def createSessions(views: RDD[PageView], timeoutInSecs: Int):
RDD[Seq[PageView]] = {
views
.map { v => (SessionKey(v.wikiDb, v.userHash, v.tstamp), v) }
.repartitionAndSortWithinPartitions(new
SessionPartitioner(views.getNumPartitions))
@@ -399,7 +260,7 @@
def tryToFillSession(): Unit = {
if (!eos && nextSessionHead.isEmpty && iter.hasNext) {
var p = iter.next._2
- if (session.isEmpty || sameSession(p, session.last,
timeoutInMillis)) {
+ if (session.isEmpty || sameSession(p, session.last,
timeoutInSecs)) {
session += p
tryToFillSession()
} else {
@@ -420,10 +281,18 @@
import sqlContext.implicits._
sessions
- .filter(views => views.nonEmpty)
- .map(views => (views.head.wikiDb,
- new Timestamp(views.head.tstamp),
- views.map(v => v.pageId.toString).mkString(" ")))
+ .map { views =>
+ // Make concatenated ids with namespace 0
+ val ids = views
+ .filter(_.pageNamespace == 0)
+ .map(_.pageId.toString)
+ .mkString(" ")
+
+ // Timestamp for session is timestamp for first view, regardless of
+ // whether it is ultimately filtered out
+ (views.head.wikiDb, new Timestamp(views.head.tstamp).toString, ids)
+ }
+ .filter(_._3.nonEmpty) // Remove sessions with zero pageviews
.repartition(outputFilesParts)
.toDF
.withColumnRenamed("_1", "wiki_db")
@@ -444,15 +313,13 @@
case class Params(
outputBasePath: String = "/user/shiladsen/sessions",
projectNamespaceTable: String =
"wmf_raw.mediawiki_project_namespace_map",
- pageTable: String = "wmf_raw.mediawiki_page",
- redirectTable: String = "wmf_raw.mediawiki_redirect",
webrequestTable: String = "wmf.webrequest",
wikiList: Seq[String] = Seq("simplewiki"),
outputFilesParts: Int = 1,
snapshot: String = "", // Parameter required, never used
as is
begin: Timestamp = new Timestamp(0), // Parameter
required, never used as is
end: Timestamp = new Timestamp(0), // Parameter required,
never used as is
- sessionTimeoutMillis: Int = 15*60*1000
+ sessionTimeoutSecs: Int = 15*60*1000
)
val YMD: DateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd")
@@ -498,9 +365,9 @@
} text ("The ending time for webrequest data gathering.")
opt[Int]('t', "timeout") optional() valueName ("<timeout>") action { (x,
p) =>
- p.copy(sessionTimeoutMillis = x)
+ p.copy(sessionTimeoutSecs = x)
} validate { x => if (x >= 0) success else failure("Invalid timeout")
- } text ("The timeout in milliseconds that distinguishes user sessions.")
+ } text ("The timeout in seconds that delineates user sessions.")
opt[String]('o', "output-base-path") optional() valueName ("<path>")
action { (x, p) =>
@@ -524,14 +391,6 @@
opt[String]("project-namespace-table") optional() valueName ("<table>")
action { (x, p) =>
p.copy(projectNamespaceTable = x)
} text ("Fully qualified name of the project-namespace table on Hive.
Default to wmf_raw.mediawiki_project_namespace_map")
-
- opt[String]("page-table") optional() valueName ("<table>") action { (x, p)
=>
- p.copy(pageTable = x)
- } text ("Fully qualified name of the page table on Hive. Default to
wmf_raw.mediawiki_page")
-
- opt[String]("redirect-table") optional() valueName ("<table>") action {
(x, p) =>
- p.copy(redirectTable = x)
- } text ("Fully qualified name of the redirect table on Hive. Default to
wmf_raw.mediawiki_redirect")
opt[String]("webrequest-table") optional() valueName ("<table>") action {
(x, p) =>
p.copy(webrequestTable = x)
@@ -571,35 +430,35 @@
sql.sparkContext
.getConf
.registerKryoClasses(Array(
- classOf[PageInfo],
- classOf[Redirect],
classOf[PageView]))
import sql.implicits._
- val projectToWikiMap = prepareProjectToWikiMap(sql,
params.projectNamespaceTable, params.snapshot, params.wikiList)
+ val projectToWikiMap = prepareProjectToWikiMap(
+ sql,
+ params.projectNamespaceTable,
+ params.snapshot,
+ params.wikiList)
val domainList = projectToWikiMap.keys.toList
val projectList = domainList.map(_.stripSuffix(".org"))
- val pages = preparePages(sql, params.pageTable, params.snapshot,
params.wikiList).cache()
- val redirects = prepareRedirects(sql, params.redirectTable,
params.snapshot, params.wikiList, pages)
- val pages2 = resolveRedirects(sql, pages, redirects)
-
val viewBeg = new Timestamp(new DateTime(params.begin.getTime)
- .minusSeconds(LOOK_PAST_SECS)
+ .minusSeconds(2 *
params.sessionTimeoutSecs)
.getMillis)
val viewEnd = new Timestamp(new DateTime(params.end.getTime)
- .minusSeconds(LOOK_FUTURE_SECS)
+ .plusSeconds(LOOK_FUTURE_SECS)
.getMillis)
+
val views = prepareViews( sql, params.webrequestTable,
viewBeg, viewEnd,
- projectList,
- projectToWikiMap,
- pages)
+ projectList, projectToWikiMap)
- val beginMillis = params.begin.getTime
- val sessions = createSessions(views, params.sessionTimeoutMillis)
- .filter(_.head.tstamp >= beginMillis)
+ val sessions = createSessions(views, params.sessionTimeoutSecs)
+ .filter { ts =>
+ ts.head.tstamp >= params.begin.getTime &&
+ ts.head.tstamp <= params.end.getTime
+ }
+
writeSessions(sql, sessions, params.outputBasePath,
params.outputFilesParts)
}
}
diff --git
a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestDataCreator.scala
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestDataCreator.scala
index c1641b0..3b2bcb8 100644
---
a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestDataCreator.scala
+++
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestDataCreator.scala
@@ -5,7 +5,7 @@
import org.joda.time.DateTime
import org.json.simple.JSONObject
-import
org.wikimedia.analytics.refinery.job.vectors.SessionPagesBuilder.{PageInfo,
PageView, Redirect, userHash}
+import
org.wikimedia.analytics.refinery.job.vectors.SessionPagesBuilder.{PageView,
userHash}
import scala.collection.{immutable, mutable}
import scala.reflect.io.Path
@@ -21,6 +21,7 @@
object TestDataCreator {
case class UserInfo(x: String, y: String, z: String, hash: Long)
+ case class Session(user: UserInfo, tstamp: Long, views: Seq[PageView])
import TestParams._
@@ -46,61 +47,53 @@
}
}
- def generatePages(wikis: Seq[String], numPagesPerWiki: Int): Seq[PageInfo] =
{
- wikis
- .flatMap(w => (1 to numPagesPerWiki).map((w, _))) // seq of (wikiId,
pageId)
- .map { case (w, p) =>
- PageInfo(w, p, "Article " + p, randNamespace(), pageIsRedirect =
false, -1)
- }
+ def randPageView(user : UserInfo, wiki: String, tstamp: Long, numArticles :
Int) : PageView = {
+ val pageId = Random.nextInt(numArticles)
+ val pageNs = (pageId.hashCode() % 10 - 8).max(0)
+ PageView(wiki, user.hash, tstamp, pageNs, pageId)
}
- def usedRedirects: mutable.Set[(String, String)] =
scala.collection.mutable.Set[(String, String)]()
-
- def generateRedirects(pages: Seq[PageInfo], redirectsPerWiki: Int) :
Seq[Redirect]= {
- val maxId = pages.map(_.pageId).max
- val wikis = pages.map(_.wikiDb).distinct.toList
- val byWiki = pages.groupBy(_.wikiDb)
-
- byWiki.keys.flatMap { w =>
- val shuffled = Random.shuffle(byWiki(w).toList).toArray
- val srcs = shuffled.slice(0, redirectsPerWiki)
- val dests = shuffled.slice(redirectsPerWiki, redirectsPerWiki * 2)
- srcs.zip(dests).map { pair => Redirect(w, pair._1.pageId,
pair._2.pageId) }
- }.toSeq
- }
-
- def generateSessions(start: Long, end: Long, pages: Seq[PageInfo],
numSessions: Int, meanLength: Int): Seq[(UserInfo, Seq[(Long, PageInfo)])] = {
- val byWiki = pages.groupBy(_.wikiDb)
- val wikis = byWiki.keys.toList
+ def generateSessions(wikis : Seq[String], articlesPerWiki: Int, start: Long,
end: Long, numSessions: Int, meanLength: Int): Seq[Session] = {
(1 to numSessions)
.map { _ =>
val n = 1 + Random.nextInt(meanLength * 2)
val w = choice(wikis)
- val tstamps = (1 to n)
- .foldLeft(List(start +
Random.nextInt((end-start).asInstanceOf[Int])))(
- (seq, _) => (seq.head +
Random.nextInt(SESSION_TIMEOUT_MILLIS-1))::seq)
- .reverse
- val session = tstamps.map { (_, choice(byWiki(w))) }
- (randUser(), session.toList)
+ val u = randUser()
+ val startTstamp = start + Random.nextInt((end-start).asInstanceOf[Int])
+ val tstamps =
+ (1 to n)
+ .foldLeft(List(startTstamp)) { (seq, _) =>
+ (seq.head + Random.nextInt(SESSION_TIMEOUT_SECS*1000-1))::seq
+ }
+ .reverse
+ val views = tstamps.map { randPageView(u, w, _, articlesPerWiki)
}.toList
+ Session(u, views.head.tstamp, views.toList)
}
}
- def writeSessions(path: String,
- pages: Seq[PageInfo],
- redirects: Seq[Redirect],
- sessions: Seq[(UserInfo, Seq[(Long, PageInfo)])]) : Unit =
{
- val pageIdMap = pages
- .map(p => ((p.wikiDb, p.pageId), p.pageId.toString))
- .union(redirects.map(r => ((r.wikiDb, r.fromPageId),
r.toPageId.toString)))
- .toMap
+ /**
+ * Prunes sessions:
+ * - Removes view for namespaces that are not 0
+ * - Removes empty sessions
+ * - Removes sessions that have start times out of the requested range
+ */
+ def pruneInvalidSessions(sessions: Seq[Session], begin: DateTime, end:
DateTime) : Seq[Session] = {
+ sessions
+ .map { s => Session(s.user, s.tstamp, s.views.filter(_.pageNamespace ==
0).toList) }
+ .filter { s =>
+ s.views.nonEmpty && s.tstamp >= begin.getMillis && s.tstamp <=
end.getMillis
+ }
+ }
+ def writeSessions(path: String, sessions: Seq[Session]) : Unit = {
val out = new PrintWriter(new FileWriter(path))
+ out.write("wiki_db\ttstamp\tpages\n")
sessions.foreach {
s =>
- val ts = s._2.head._1.toString
- val wiki = s._2.head._2.wikiDb
- val pageIds = s._2.map(tsAndPage => pageIdMap((tsAndPage._2.wikiDb,
tsAndPage._2.pageId))).mkString(" ")
- out.write(ts + "\t" + wiki + "\t" + pageIds + "\n")
+ val ts = new Timestamp(s.tstamp).toString
+ val wiki = s.views.head.wikiDb
+ val pageIds = s.views.map { _.pageId.toString }.mkString(" ")
+ out.write(wiki + "\t" + ts + "\t" + pageIds + "\n")
}
out.close()
}
@@ -108,9 +101,9 @@
import scala.collection.JavaConverters._
- def writeWikis(path: String, pages: Seq[PageInfo]): Unit = {
+ def writeWikis(path: String, wikis: Seq[String]): Unit = {
val out = new PrintWriter(new FileWriter(path))
- pages.map(_.wikiDb).distinct.foreach {
+ wikis.distinct.foreach {
w =>
val js = Map(
"hostname" -> (w + ".org"),
@@ -123,67 +116,26 @@
out.close()
}
- def writePages(path: String, pages: Seq[PageInfo]): Unit = {
- val out = new PrintWriter(new FileWriter(path))
- pages.foreach { p =>
- val js = Map(
- "wiki_db" -> p.wikiDb,
- "page_id" -> p.pageId,
- "page_title" -> p.pageTitle,
- "snapshot" -> SNAPSHOT,
- "page_namespace" -> p.pageNamespace,
- "page_is_redirect" -> p.pageIsRedirect
- )
- JSONObject.writeJSONString(js.asJava, out)
- out.write("\n")
- }
- out.close()
- }
-
- def writeRedirects(path: String, pages: Seq[PageInfo], redirects:
Seq[Redirect]): Unit = {
- val titles = pages.map(p => ((p.wikiDb, p.pageId), p.pageTitle)).toMap
+ def writeRequests(path: String, sessions: Seq[Session]): Unit = {
+ val allViews = sessions.flatMap { s => s.views.map((s.user, _)) }
val out = new PrintWriter(new FileWriter(path))
- redirects.foreach { r =>
- val js = Map(
- "wiki_db" -> r.wikiDb,
- "rd_from" -> titles((r.wikiDb, r.fromPageId)),
- "snapshot" -> SNAPSHOT,
- "rd_title" -> titles((r.wikiDb, r.toPageId)),
- "rd_namespace" -> 0
- )
- JSONObject.writeJSONString(js.asJava, out)
- out.write("\n")
- }
- out.close()
- }
-
-
- def writeRequests(path: String, sessions: Seq[(UserInfo, Seq[(Long,
PageInfo)])]): Unit = {
- val allViews = sessions.flatMap { s =>
- val userInfo = s._1
- s._2.map( tsAndPage => (userInfo, tsAndPage._1, tsAndPage._2))
- }
-
- val out = new PrintWriter(new FileWriter(path))
- Random.shuffle(allViews).foreach { v =>
- val user = v._1
- val dt = new DateTime(v._2)
- val p = v._3
- val pvi = Map(
- "project" -> p.wikiDb,
- "page_title" -> p.pageTitle
- ).asJava
+ Random.shuffle(allViews).foreach { case (user, view) =>
+ val dt = new DateTime(view.tstamp)
val js = Map(
- "pageview_info" -> pvi,
+ "pageview_info" -> Map( "project" -> view.wikiDb ).asJava,
"ts" -> new Timestamp(dt.getMillis).toString,
"user_agent" -> user.x,
+ "namespace_id" -> view.pageNamespace,
+ "page_id" -> view.pageId,
"x_forwarded_for" -> user.y,
"client_ip" -> user.z,
"is_pageview" -> true,
"agent_type" -> "user",
"webrequest_source" -> "text",
+ "access_method" -> "desktop",
+ "normalized_host" -> Map("project_class" -> "wikimedia").asJava,
"year" -> dt.getYear,
"month" -> dt.getMonthOfYear,
"day" -> dt.getDayOfMonth,
@@ -196,21 +148,19 @@
}
def main(args: Array[String]): Unit = {
- val pages = generatePages(WIKIS, NUM_PAGES)
- val redirects = generateRedirects(pages, NUM_REDIRECTS)
+ Random.setSeed(1L)
// Allow some sessions to start earlier than the requested range.
val minStart = START_TIME.getMillis - (END_TIME.getMillis -
START_TIME.getMillis) / 4
- val sessions = generateSessions(minStart, END_TIME.getMillis,
- pages, NUM_SESSIONS, MEAN_SESSION_LENGTH)
+ val sessions = generateSessions(
+ WIKIS, NUM_PAGES,
+ minStart, END_TIME.getMillis,
+ NUM_SESSIONS, MEAN_SESSION_LENGTH)
Path(PATH_DATA).createDirectory(force = true, failIfExists = false)
- writeWikis(PATH_DATA + "/wikis.json", pages)
- writePages(PATH_DATA + "/pages.json", pages)
- writeRedirects(PATH_DATA + "/redirects.json", pages, redirects)
- writeRequests(PATH_DATA + "/views.json", sessions)
- writeSessions(PATH_DATA + "/sessions.txt", pages, redirects,
- sessions.filter(_._2.head._1 >= START_TIME.getMillis))
+ writeWikis(PATH_DATA + "/wikis.json", WIKIS)
+ writeRequests(PATH_DATA + "/requests.json", sessions)
+ writeSessions(PATH_DATA + "/sessions.txt", pruneInvalidSessions(sessions,
START_TIME, END_TIME))
}
}
diff --git
a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestParams.scala
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestParams.scala
index 85e5222..6a27d72 100644
---
a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestParams.scala
+++
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestParams.scala
@@ -13,13 +13,15 @@
val START_TIME = new DateTime(2017, 1, 1, 21, 0, 0)
val END_TIME = new DateTime(2017, 1, 2, 2, 0, 0)
- val WIKIS: Seq[String] = (1 to 3).map{ "wiki_" + _ }
- val NUM_PAGES = 1000 // per wiki
- val NUM_REDIRECTS = 300 // per wiki
- val NUM_SESSIONS = 1000
- val MEAN_SESSION_LENGTH = 20
- val SESSION_TIMEOUT_MILLIS: Int = 60 * 15 * 1000 // 15 min
+ val WIKIS: Seq[String] = (1 to 3).map{ "wiki_" + _ }
+ val NUM_PAGES = 100 // per wiki
+ val NUM_REDIRECTS = 3 // per wiki
+ val NUM_SESSIONS = 1000
+ val MEAN_SESSION_LENGTH = 10
+
+ // TODO: Change millis to seconds
+ val SESSION_TIMEOUT_SECS: Int = 60 * 15 // 15 min
val PATH_DATA = "./test-data/"
}
diff --git
a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPagesBuilder.scala
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPagesBuilder.scala
index 749aab8..b1453f2 100644
---
a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPagesBuilder.scala
+++
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPagesBuilder.scala
@@ -5,8 +5,12 @@
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
+import scala.io.Source
+
/**
* @author Shilad Sen
+ *
+ * TODO: Turn this into a regular unit test.
*/
object TestSessionPagesBuilder {
import TestParams._
@@ -15,8 +19,6 @@
val params = SessionPagesBuilder.Params(
outputBasePath = PATH_DATA + "/spark_sessions",
projectNamespaceTable = "wikis",
- pageTable = "pages",
- redirectTable = "redirects",
wikiList = WIKIS.toList,
webrequestTable = "requests",
snapshot = SNAPSHOT,
@@ -31,15 +33,31 @@
val sql = new SQLContext(sc)
- sql.read.json(PATH_DATA + "/views.json")
+ sql.read.json(PATH_DATA + "/requests.json")
.registerTempTable(params.webrequestTable)
- sql.read.json(PATH_DATA + "/redirects.json")
- .registerTempTable(params.redirectTable)
- sql.read.json(PATH_DATA + "/pages.json")
- .registerTempTable(params.pageTable)
sql.read.json(PATH_DATA + "/wikis.json")
.registerTempTable(params.projectNamespaceTable)
SessionPagesBuilder(sql, params)
+
+ val computed = Source.fromFile(PATH_DATA +
"/spark_sessions/part-00000").getLines.toSet
+ val correct = Source.fromFile(PATH_DATA + "/sessions.txt").getLines.toSet
+
+ val nMissing = correct
+ .filter { !computed.contains(_) }
+ .toList
+ .sorted
+ .map { line => System.out.println("Missing correct session: " + line);
line }
+ .length
+
+ val nAdded = computed
+ .filter { !correct.contains(_) }
+ .toList
+ .sorted
+ .map { line => System.out.println("Additional incorrect session: " +
line); line }
+ .length
+
+ assert(nMissing == 0)
+ assert(nAdded == 0)
}
}
\ No newline at end of file
--
To view, visit https://gerrit.wikimedia.org/r/381517
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Iaec6ee811dd3da30152c3dbb206bf3a505370f90
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery/source
Gerrit-Branch: nav-vectors
Gerrit-Owner: Shilad Sen <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits