[MediaWiki-commits] [Gerrit] analytics...source[nav-vectors]: Spark job to create page ids viewed in each session

2017-10-11 Thread Shilad Sen (Code Review)
Shilad Sen has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/383761 )

Change subject: Spark job to create page ids viewed in each session
..

Spark job to create page ids viewed in each session

SessionPagesBuilder creates a table representing viewed page grouped by
browser session. The output is a table containing columns for wiki, date,
timestamp, and a space separated list of all the page ids viewed in the
session in order.

The job now runs on the cluster in a reasonable amount of time (10 min for
a day's worth of views).

SessionPruner filters the session table and removes any views of pages
below some threshold. As a side effect it creates a frequency table.

The testing harness creates fake test data and compares computed spark
results against computed in-memory results.

TODO:
* Oozify job (may require switching to Spark 2)

Bug: T174796
Change-Id: I55395459d80d73f3d065967ce95d6506698d128e

  Complete pass at session creation pipeline

Added session pruner
Switched to use tables instead of files
Cleaned up tests

Change-Id: I19160e16d8140d03d81a4226e3974f42ec1e3602
---
A 
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/PartitionQueryBuilder.scala
A 
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
A 
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPruner.scala
A 
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/VectorUtils.scala
A 
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestDataCreator.scala
A 
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestPartitionQueryBuilder.scala
A 
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPagesBuilder.scala
A 
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPruner.scala
A 
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestUtils.scala
9 files changed, 1,425 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source 
refs/changes/61/383761/1

diff --git 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/PartitionQueryBuilder.scala
 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/PartitionQueryBuilder.scala
new file mode 100644
index 000..cd43cea
--- /dev/null
+++ 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/PartitionQueryBuilder.scala
@@ -0,0 +1,96 @@
+package org.wikimedia.analytics.refinery.job.vectors
+
+import java.sql.Timestamp
+
+import org.joda.time.{DateTime, DateTimeConstants, Hours, Interval}
+
+/**
+  * @author Shilad Sen
+  */
+object PartitionQueryBuilder {
+
+  /**
+* Creations a SQL predicate that includes the time period from beginTs to 
endTs
+*
+* The SQL predicate is somewhat compressed, but there is still room for 
improvement.
+*
+* @return "(year = 2016) or (year = 2017 and month = 1 and day = 1 and 
hour < 3)"
+*/
+  def formSql(beginTs: Timestamp, endTs: Timestamp) : String = {
+val begin = new DateTime(beginTs.getTime).hourOfDay().roundFloorCopy()
+val end = new DateTime(endTs.getTime).hourOfDay().roundCeilingCopy()
+if (begin == end) {
+s"(year = ${begin.getYear} " +
+s"and month = ${begin.getMonthOfYear} " +
+s"and day = ${begin.getDayOfMonth} " +
+s"and hour = ${begin.getHourOfDay})"
+} else {
+  formSqlConditions(begin, end).map("(" + _ + ")").mkString(" OR ")
+}
+  }
+
+  def formSqlConditions(begin: DateTime, end: DateTime) : Seq[String] = {
+if (begin == end) {
+  return List()
+}
+
+// Try to take a year out of the middle
+var startYear = begin.year().roundCeilingCopy()
+var endYear = startYear.plusYears(1)
+if (!startYear.isBefore(begin) && !endYear.isAfter(end)) {
+  return  formSqlConditions(begin, startYear) ++
+  List(s"year = ${startYear.getYear}") ++
+  formSqlConditions(endYear, end)
+}
+
+// Try to take a month out of the middle
+var startMonth = begin.monthOfYear().roundCeilingCopy()
+var endMonth = startMonth.plusMonths(1)
+if (!startMonth.isBefore(begin) && !endMonth.isAfter(end)) {
+  return  formSqlConditions(begin, startMonth) ++
+List(s"year = ${startMonth.getYear} " +
+ s"and month = ${startMonth.getMonthOfYear}") ++
+formSqlConditions(endMonth, end)
+}
+
+// Try to take a day out of the middle
+var startDay = begin.dayOfMonth().roundCeilingCopy()
+var endDay = startDay.plusDays(1)
+if (!startDay.isBefore(begin) && !endDay.isAfter(end)) {
+  return  formSqlConditions(begin, startDay) ++
+List(s"year = ${startDay.getYear} " +
+ s"and month = ${startDay.getMonthOfYear} " +
+

[MediaWiki-commits] [Gerrit] analytics...source[nav-vectors]: Simplified job to create session ids and finished debugging it.

2017-09-29 Thread Shilad Sen (Code Review)
Shilad Sen has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/381517 )

Change subject: Simplified job to create session ids and finished debugging it.
..

Simplified job to create session ids and finished debugging it.

Changes:

* Instead of calculating page ids from titles, took page ids and NS ids
directly from webrequest table. I think that this works okay in the
scenarios I care about (i.e. desktop page views)

* Builder now passes unit tests.

TODO:

* Added sessions separated by timeout to testing data

* Benchmark new version on production cluster.

* Turn test into proper unit test.

Bug: T174796
Change-Id: Iaec6ee811dd3da30152c3dbb206bf3a505370f90
---
M 
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
M 
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestDataCreator.scala
M 
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestParams.scala
M 
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPagesBuilder.scala
4 files changed, 146 insertions(+), 317 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source 
refs/changes/17/381517/1

diff --git 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
index 1d03705..6e593b4 100644
--- 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
+++ 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
@@ -14,8 +14,7 @@
   * This create output files with one line per session.
   * The first two tokens of each line are the session start timestamp and 
language edition.
   * The following tokens are the page ids viewed in the session, in order.
-  **
- *
+  *
   * @author Shilad Sen
   */
 object SessionPagesBuilder {
@@ -51,6 +50,11 @@
   val LOOK_PAST_SECS: Int = 60 * 59
 
 
+  /**
+* A user session is defined by: A wiki, a hash of unique user information,
+* and the timestamp for a view. This is used to stream through pageviews
+* that may have been part of the same user session.
+*/
   case class SessionKey (wikiDb: String, userHash: Long, tstamp: Long)
 extends Ordered[SessionKey] {
 override def compare(that: SessionKey): Int = {
@@ -63,7 +67,9 @@
 }
   }
 
-
+  /**
+* Partitions all views for the same wiki from the same user onto a single 
partition.
+*/
   class SessionPartitioner(partitions: Int) extends Partitioner {
 require(partitions >= 0, s"Number of partitions ($partitions) cannot be 
negative.")
 
@@ -81,6 +87,11 @@
 }
   }
 
+  /**
+* Wrapper that allows us to handle production hive timestamps (which
+* are already timestmaps), and testing data from JSON files (which are
+* strings) uniformly.
+*/
   def getTimestamp(ts: Any) : Timestamp = {
 ts match {
   case _: Timestamp =>
@@ -90,19 +101,6 @@
   case _ => throw new IllegalArgumentException(ts.toString)
 }
   }
-
-  /**
-* A single page.
-* Effective page id is the page id after resolving redirects.
-*/
-  case class PageInfo(
-   wikiDb: String,
-   pageId: Long,
-   pageTitle: String,
-   pageNamespace: Long,
-   pageIsRedirect: Boolean,
-   effectivePageId : Long
- )
 
   /**
 * A view of a page by a user.
@@ -117,21 +115,14 @@
pageId: Long
  )
 
-  case class Redirect(
-   wikiDb: String,
-   fromPageId: Long,
-   toPageId: Long
- )
-
-
   /**
 * Returns true iff the two view are part of the same session.
 * User hashes must match and the pageviews must fall within a specified 
window.
 */
-  def sameSession(v1: PageView, v2: PageView, timeoutMillis: Long) : Boolean = 
{
+  def sameSession(v1: PageView, v2: PageView, timeoutSecs: Long) : Boolean = {
 (v1.wikiDb == v2.wikiDb) &&
   (v1.userHash == v2.userHash) &&
-  (Math.abs(v1.tstamp - v2.tstamp) <= timeoutMillis)
+  (Math.abs(v1.tstamp - v2.tstamp) <= timeoutSecs * 1000)
   }
 
 
@@ -179,131 +170,6 @@
   map(r => r.getString(0) -> r.getString(1)).toMap
   }
 
-  def hoursInRange(begin: org.joda.time.DateTime, end: org.joda.time.DateTime) 
: Seq[(Int, Int, Int, Int)]= {
-val beginHour = begin.hourOfDay().roundFloorCopy()
-val endHour = end.hourOfDay().roundCeilingCopy()
-val numHours = Hours.hoursBetween(beginHour, endHour).getHours
-val dates = (0 to numHours).map(beginHour.plusHours)
-dates.map(d => (d.getYear, d.getMonthOfYear, 

[MediaWiki-commits] [Gerrit] analytics...source[nav-vectors]: WIP: Spark job to create page ids viewed in each session

2017-09-28 Thread Shilad Sen (Code Review)
Shilad Sen has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/381169 )

Change subject: WIP: Spark job to create page ids viewed in each session
..

WIP: Spark job to create page ids viewed in each session

This job will create a summary of viewer sessions, with one line per
user, and each line consisting of a wiki project code, timestamp, and
all the page ids viewed in the session.

The job now runs on the cluster in a reasonable amount of time (22 min
for a day's worth of views).

I have written a testing harness, but not quite automated it yet. It is
failing and I have not started debugging yet, but this shouldn't be too
tricky.

TODO:
* Polish and debug tests
* Oozify job (may require switching to Spark 2)

Bug: T174796
Change-Id: I55395459d80d73f3d065967ce95d6506698d128e
---
A 
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/PartitionQueryBuilder.scala
A 
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
A 
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestDataCreator.scala
A 
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestParams.scala
A 
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestPartitionQueryBuilder.scala
A 
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/vectors/TestSessionPagesBuilder.scala
6 files changed, 1,064 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source 
refs/changes/69/381169/1

diff --git 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/PartitionQueryBuilder.scala
 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/PartitionQueryBuilder.scala
new file mode 100644
index 000..cd43cea
--- /dev/null
+++ 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/PartitionQueryBuilder.scala
@@ -0,0 +1,96 @@
+package org.wikimedia.analytics.refinery.job.vectors
+
+import java.sql.Timestamp
+
+import org.joda.time.{DateTime, DateTimeConstants, Hours, Interval}
+
+/**
+  * @author Shilad Sen
+  */
+object PartitionQueryBuilder {
+
+  /**
+* Creations a SQL predicate that includes the time period from beginTs to 
endTs
+*
+* The SQL predicate is somewhat compressed, but there is still room for 
improvement.
+*
+* @return "(year = 2016) or (year = 2017 and month = 1 and day = 1 and 
hour < 3)"
+*/
+  def formSql(beginTs: Timestamp, endTs: Timestamp) : String = {
+val begin = new DateTime(beginTs.getTime).hourOfDay().roundFloorCopy()
+val end = new DateTime(endTs.getTime).hourOfDay().roundCeilingCopy()
+if (begin == end) {
+s"(year = ${begin.getYear} " +
+s"and month = ${begin.getMonthOfYear} " +
+s"and day = ${begin.getDayOfMonth} " +
+s"and hour = ${begin.getHourOfDay})"
+} else {
+  formSqlConditions(begin, end).map("(" + _ + ")").mkString(" OR ")
+}
+  }
+
+  def formSqlConditions(begin: DateTime, end: DateTime) : Seq[String] = {
+if (begin == end) {
+  return List()
+}
+
+// Try to take a year out of the middle
+var startYear = begin.year().roundCeilingCopy()
+var endYear = startYear.plusYears(1)
+if (!startYear.isBefore(begin) && !endYear.isAfter(end)) {
+  return  formSqlConditions(begin, startYear) ++
+  List(s"year = ${startYear.getYear}") ++
+  formSqlConditions(endYear, end)
+}
+
+// Try to take a month out of the middle
+var startMonth = begin.monthOfYear().roundCeilingCopy()
+var endMonth = startMonth.plusMonths(1)
+if (!startMonth.isBefore(begin) && !endMonth.isAfter(end)) {
+  return  formSqlConditions(begin, startMonth) ++
+List(s"year = ${startMonth.getYear} " +
+ s"and month = ${startMonth.getMonthOfYear}") ++
+formSqlConditions(endMonth, end)
+}
+
+// Try to take a day out of the middle
+var startDay = begin.dayOfMonth().roundCeilingCopy()
+var endDay = startDay.plusDays(1)
+if (!startDay.isBefore(begin) && !endDay.isAfter(end)) {
+  return  formSqlConditions(begin, startDay) ++
+List(s"year = ${startDay.getYear} " +
+ s"and month = ${startDay.getMonthOfYear} " +
+ s"and day = ${startDay.getDayOfMonth}") ++
+formSqlConditions(endDay, end)
+}
+
+// Do we have a collection of hours that run up to the end of the starting 
day?
+var startOfNextDay = begin.withTimeAtStartOfDay().plusDays(1)
+if (!startOfNextDay.isAfter(end)) {
+ return  List(s"year = ${begin.getYear} " +
+  s"and month = ${begin.getMonthOfYear} " +
+  s"and day = ${begin.getDayOfMonth} " +
+  s"and hour >= ${begin.getHourOfDay}") ++
+formSqlConditions(startOfNextDay, end)
+}
+
+// Do we have a 

[MediaWiki-commits] [Gerrit] analytics...source[master]: Spark job to create session event log appears to be working.

2017-09-12 Thread Shilad Sen (Code Review)
Shilad Sen has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/377706 )

Change subject: Spark job to create session event log appears to be working.
..

Spark job to create session event log appears to be working.

Session event log is one session per line, with a list of page ids
viewed in that session ordered by tstamp.

simplewiki  2017-06-08 03:28:15.0   34242   9700
simplewiki  2017-06-08 03:14:49.0   31246
simplewiki  2017-06-08 03:14:22.0   34019   908481247   121

TODO:
* Benchmark on larger datasets
* Optimize if necessary (try broadcasting datastructure to avoid join)
* Compare data cleaning with Elery's scripts

Bug: T174796
Change-Id: I52103578d50d492eaeff2eeffc8629331a1260da
---
A 
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
1 file changed, 547 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source 
refs/changes/06/377706/1

diff --git 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
new file mode 100644
index 000..d77a96a
--- /dev/null
+++ 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
@@ -0,0 +1,547 @@
+package org.wikimedia.analytics.refinery.job.vectors
+
+import org.apache.log4j.{Level, LogManager}
+import org.apache.spark.Partitioner
+
+import scala.collection.mutable.ArrayBuffer
+import scala.util.hashing.MurmurHash3
+
+/**
+  * Process to create a page view session log.
+  *
+  * This create output files with one line per session.
+  * The first two tokens of each line are the session start timestamp and 
language edition.
+  * The following tokens are the page ids viewed in the session, in order.
+  **
+  * @author Shilad Sen
+  */
+object SessionPagesBuilder {
+
+
+  import org.apache.spark.sql.hive.HiveContext
+  import org.apache.spark.SparkConf
+  import org.apache.spark.sql.SaveMode
+  import org.apache.spark.SparkContext
+  import scopt.OptionParser
+  import org.apache.spark.rdd.RDD
+  import org.apache.spark.sql.SQLContext
+
+
+  case class SessionKey (wikiDb: String, userHash: Long, tstamp: Long)
+extends Ordered[SessionKey] {
+override def compare(that: SessionKey): Int = {
+  import scala.math.Ordered.orderingToOrdered
+
+  Ordering.Tuple3(Ordering.String, Ordering.Long, Ordering.Long).compare(
+(wikiDb, userHash, tstamp),
+(that.wikiDb, that.userHash, that.tstamp)
+  )
+}
+  }
+
+
+  class SessionPartitioner(partitions: Int) extends Partitioner {
+require(partitions >= 0, s"Number of partitions ($partitions) cannot be 
negative.")
+
+override def numPartitions: Int = partitions
+
+override def getPartition(key: Any): Int = {
+  val k = key.asInstanceOf[SessionKey]
+  Math.abs(k.wikiDb.hashCode() + k.userHash.asInstanceOf[Int]) % 
numPartitions
+}
+  }
+
+  object SessionKey {
+implicit def orderingByTimestamp[A <: SessionKey] : Ordering[A] = {
+  Ordering.by(fk => (fk.wikiDb, fk.userHash, fk.tstamp))
+}
+  }
+
+
+
+  case class PageInfo(
+   wikiDb: String,
+   pageId: Long,
+   pageTitle: String,
+   pageNamespace: Long,
+   pageIsRedirect: Boolean,
+   effectivePageId : Long// page id or redirected 
destination page id
+ )
+
+  case class Redirect(
+   wikiDb: String,
+   fromPageId: Long,
+   toPageId: Long
+ )
+
+  case class PageView(
+   wikiDb: String,
+   userHash: Long,
+   tstamp: Long,
+   pageNamespace: Long,
+   pageId: Long
+ )
+
+  def sameSession(v1: PageView, v2: PageView, timeoutMillis: Long) : Boolean = 
{
+(v1.wikiDb == v2.wikiDb) &&
+  (v1.userHash == v2.userHash) &&
+  (Math.abs(v1.tstamp - v2.tstamp) <= timeoutMillis)
+  }
+
+
+  /**
+* Given components associated with a distinct user , return a long hash
+* @param client_ip
+* @param user_agent
+* @param x_forwarded_for
+* @return
+*/
+  def userHash(
+   client_ip: String,
+   user_agent: String,
+   x_forwarded_for: String
+ ) : Long = {
+var s = client_ip + ":" + user_agent + ":" +  x_forwarded_for
+(MurmurHash3.stringHash(s).asInstanceOf[Long] << 32) |
+  (MurmurHash3.stringHash(s.reverse).asInstanceOf[Long] & 0xFFFL)
+  }
+
+  /**
+* Steps:
+*  - prepare raw tables:
+*- page: Keep only wiki_db, page_id, page_namespace, page_is_redirect,
+*

[MediaWiki-commits] [Gerrit] analytics...source[master]: Placeholder for job to create page ids viewed in each session.

2017-09-08 Thread Shilad Sen (Code Review)
Shilad Sen has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/376797 )

Change subject: Placeholder for job to create page ids viewed in each session.
..

Placeholder for job to create page ids viewed in each session.

Trying to make sure gerrit is setup, the nav-vectors branch works
properly, and it can be run and built interactively on the server.

Bug: T174796
Change-Id: Ifec6df906e03846aef63fd7f918f194dfcfabc11
---
A 
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
1 file changed, 327 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source 
refs/changes/97/376797/1

diff --git 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
new file mode 100644
index 000..7ab9efc
--- /dev/null
+++ 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/vectors/SessionPagesBuilder.scala
@@ -0,0 +1,327 @@
+package org.wikimedia.analytics.refinery.job.vectors
+
+/**
+  * Process to create a page view session log.
+  *
+  * Eventually this will create output files with one line per session.
+  * The first two tokens of each line will be the session start timestamp and 
language edition.
+  * The following tokens are the page ids viewed in the session, in order.
+  *
+  * This is a work in progress started by taking some of joal's work in 
ClickStreamBuilder.
+  *
+  * @author Shilad Sen
+  */
+object SessionPagesBuilder {
+
+
+  import org.apache.spark.sql.hive.HiveContext
+  import org.apache.spark.SparkConf
+  import org.apache.spark.sql.SaveMode
+  import org.apache.spark.SparkContext
+  import scopt.OptionParser
+  import org.apache.spark.rdd.RDD
+  import org.apache.spark.sql.SQLContext
+
+  case class PageInfo(
+   wikiDb: String,
+   pageId: Long,
+   pageTitle: String,
+   pageNamespace: Long,
+   pageIsRedirect: Boolean
+ )
+
+  case class Redirect(
+   wikiDb: String,
+   fromPageId: Long,
+   toPageId: Long
+ )
+
+  /**
+* Steps:
+*  - prepare raw tables:
+*- page
+*  Keep only wiki_db, page_id, page_namespace, page_is_redirect, 
page_title
+*  Insert fake values used later
+*- redirect
+*  Join with page (from and to) to clean and denormalize
+*- pagelinks
+*  Join with page (from and to) to clean and denormalize
+*  Join with redirect to resolve directs
+*  Take distinct instances
+*  - Get data from Webrequest:
+*- (prev[referer/referer_class parsing], curr[page_title], type, count)
+*  - Joint with pages (namespace 0 only)
+*  - Joint with redirect (resolved)
+*  - Annotated (type)
+*- Save (title_prev, title_to, type, count)
+*  - Joint with page for titles (+ no redirects, only namespace 0, and 
no from = to)
+*/
+
+
+
+  def listToSQLInCondition(list: Seq[String]): String = list.map(w => 
s"'$w'").mkString(", ")
+
+  /**
+* Prepare a map linking project hostnames to project dbnames.
+* @return A map[hostname -> dbname]
+*/
+  def prepareProjectToWikiMap(
+   sqlContext: SQLContext,
+   projectNamespaceTable: String,
+   snapshot: String,
+   wikiList: Seq[String]
+ ): Map[String, String] = {
+sqlContext.sql(
+  s"""
+ |SELECT DISTINCT
+ |  hostname,
+ |  dbname
+ |FROM $projectNamespaceTable
+ |WHERE snapshot = '$snapshot'
+ |  AND dbname IN (${listToSQLInCondition(wikiList)})
+""".stripMargin).
+  rdd.
+  collect.
+  map(r => r.getString(0) -> r.getString(1)).toMap
+  }
+
+  /**
+* Prepare the pages dataset to be reused
+* @return A RDD of PageInfo data augmented with fake pages for each wiki
+*/
+  def preparePages(
+sqlContext: SQLContext,
+pageTable: String,
+snapshot: String,
+wikiList: Seq[String]
+  ): RDD[PageInfo] = {
+sqlContext.sql(s"""
+  |SELECT
+  |  wiki_db,
+  |  page_id,
+  |  page_title,
+  |  page_namespace,
+  |  page_is_redirect
+  |FROM $pageTable
+  |WHERE snapshot = '$snapshot'
+  |  AND wiki_db IN (${listToSQLInCondition(wikiList)})
+  |  AND page_id IS NOT NULL
+