EBernhardson has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/357320 )

Change subject: Better grouping of normalized queries
......................................................................

Better grouping of normalized queries

After reviewing numerous query groupings, it was clear that the lucene
stemmer does a pretty good job in most cases, but has edge cases that
groups completely unrelated queries together. The most obvious is
perhaps when lucene returns an empty string due to stop words (to be or
not to be), but there are various other cases as well.

Add another step after the lucene stemmer which compares the similarity
of result sets returned for queries. This uses a relatively simple
jaccard binary simliarity measure to compare the sets of returned page
ids, and then a very simple clustering algorithm to group together
only queries that have a minimum threshold of result similarity.

Removes the norm_query field and replaces it with norm_query_id. This
new field is unique across all wikis and can be grouped on directly.
Likely though the need to keep `wikiid` in the data frame for later
processing will mean most actions still group on (wikiid,
norm_query_id).

The current threshold for clustering of 0.5 is arbitrarily chosen, it
essentially says if we have 20 results displayed for each query then
at least 14 results must be shared between the two. It seems plausible
this could go down to 0.33, which is an overlap of 10, but it needs
to be more thoroughly reviewed.

Change-Id: I717c97cebf1762071c4e7bdf762aebb3a4520a97
---
M mjolnir/cli/data_pipeline.py
M mjolnir/dbn.py
A mjolnir/norm_query.py
M mjolnir/sampling.py
M mjolnir/test/fixtures/dbn_input.json
M mjolnir/test/test_dbn.py
M mjolnir/test/test_sampling.py
M mjolnir/test/training/test_tuning.py
M mjolnir/training/tuning.py
9 files changed, 264 insertions(+), 82 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/search/MjoLniR 
refs/changes/20/357320/1

diff --git a/mjolnir/cli/data_pipeline.py b/mjolnir/cli/data_pipeline.py
index dbf3d58..7508c0c 100644
--- a/mjolnir/cli/data_pipeline.py
+++ b/mjolnir/cli/data_pipeline.py
@@ -12,6 +12,7 @@
 
 import mjolnir.dbn
 import mjolnir.metrics
+import mjolnir.norm_query
 import mjolnir.features
 import mjolnir.sampling
 from pyspark import SparkContext
@@ -38,14 +39,17 @@
         .withColumn('hit_page_ids', F.col('hits.pageid'))
         .drop('hits')
         .withColumn('click_page_ids', F.col('clicks.pageid'))
-        .drop('clicks')
-        # Normalize queries using the lucene stemmer
-        .withColumn('norm_query', F.expr('stemmer(query, substring(wikiid, 1, 
2))')))
+        .drop('clicks'))
+
+    # Normalize queries into groups of related queries. This helps to have a 
larger
+    # number of sessions per normalized query to train the DBN on.
+    # Note that df_norm comes back cached
+    df_norm = mjolnir.norm_query.transform(df_clicks)
 
     # Sample to some subset of queries per wiki
     df_sampled = (
         mjolnir.sampling.sample(
-            df_clicks,
+            df_norm,
             wikis=wikis,
             seed=54321,
             queries_per_wiki=20000,
@@ -57,6 +61,10 @@
         .withColumn('clicked', F.expr('array_contains(click_page_ids, 
hit_page_id)'))
         .drop('click_page_ids')
         .cache())
+
+    # materialize df_sampled and unpersist df_norm
+    df_sampled.count()
+    df_norm.unpersist()
 
     # Learn relevances
     df_rel = (
@@ -74,11 +82,11 @@
 
     df_all_hits = (
         df_sampled
-        .select('wikiid', 'query', 'norm_query', 'hit_page_id', 'session_id', 
'hit_position')
-        .join(df_rel, how='inner', on=['wikiid', 'norm_query', 'hit_page_id'])
+        .select('wikiid', 'query', 'norm_query_id', 'hit_page_id', 
'session_id', 'hit_position')
+        .join(df_rel, how='inner', on=['wikiid', 'norm_query_id', 
'hit_page_id'])
         .cache())
 
-    # materialize df_all_hits and drop df_sampled
+    # materialize df_all_hits and drop df_sampled, df_norm
     df_all_hits.count()
     df_sampled.unpersist()
 
@@ -88,7 +96,7 @@
 
     df_hits = (
         df_all_hits
-        .groupBy('wikiid', 'query', 'norm_query', 'hit_page_id')
+        .groupBy('wikiid', 'query', 'norm_query_id', 'hit_page_id')
         # weight is now the number of times a hit was displayed to a user
         .agg(F.count(F.lit(1)).alias('weight'),
              F.mean('hit_position').alias('hit_position'),
@@ -107,7 +115,7 @@
     print 'unweighted ndcg@10: %.4f' % (ndcgAt10)
 
     # Collect features for all known queries. Note that this intentionally
-    # uses query and NOT norm_query. Merge those back into the source hits.
+    # uses query and NOT norm_query_id. Merge those back into the source hits.
     df_features = mjolnir.features.collect(
         df_hits,
         # TODO: Should be a CLI option of some sort
diff --git a/mjolnir/dbn.py b/mjolnir/dbn.py
index 96b37ca..a7ee154 100644
--- a/mjolnir/dbn.py
+++ b/mjolnir/dbn.py
@@ -62,7 +62,7 @@
     ----------
     iterator : ???
         iterator over pyspark.sql.Row. Each row must have a wikiid,
-        norm_query, and list of hits each containing hit_position,
+        norm_query_id, and list of hits each containing hit_position,
         hit_page_id and clicked.
 
     Yields
@@ -81,7 +81,7 @@
             clicks.append(hit.clicked)
         yield '\t'.join([
             '0',  # unused identifier
-            row.norm_query,  # group the session belongs to
+            str(row.norm_query_id),  # group the session belongs to
             row.wikiid,  # region
             '0',  # intent weight
             json.dumps(results),  # hits displayed in session
@@ -104,18 +104,21 @@
     Returns
     -------
     list of tuples
-        List of four value tuples each containing wikiid, norm_query,
+        List of four value tuples each containing wikiid, norm_query_id,
         hit_page_id and relevance.
     """
     # reader converted all the page ids into an internal id, flip the map so we
     # can change them back. Not the most memory efficient, but it will do.
     uid_to_url = {uid: url for url, uid in reader.url_to_id.iteritems()}
     rows = []
-    for (norm_query, wikiid), qid in reader.query_to_id.iteritems():
+    for (norm_query_id, wikiid), qid in reader.query_to_id.iteritems():
+        # clickmodels required the group key to be a string, convert back
+        # to an int to match input data
+        norm_query_id = int(norm_query_id)
         for uid, data in model.urlRelevances[False][qid].iteritems():
             relevance = data['a'] * data['s']
             hit_page_id = int(uid_to_url[uid])
-            rows.append((wikiid, norm_query, hit_page_id, relevance))
+            rows.append((wikiid, norm_query_id, hit_page_id, relevance))
     return rows
 
 
@@ -123,13 +126,13 @@
     """Generate relevance labels for the provided dataframe.
 
     Process the provided data frame to generate relevance scores for
-    all provided pairs of (wikiid, norm_query, hit_page_id). The input
+    all provided pairs of (wikiid, norm_query_id, hit_page_id). The input
     DataFrame must have a row per hit_page_id that was seen by a session.
 
     Parameters
     ----------
     df : pyspark.sql.DataFrame
-        User click logs with columns wikiid, norm_query, session_id,
+        User click logs with columns wikiid, norm_query_id, session_id,
         hit_page_id, hit_position, clicked.
     dbn_config : dict
         Configuration needed by the DBN. See clickmodels documentation for more
@@ -143,9 +146,9 @@
     Returns
     -------
     spark.sql.DataFrame
-        DataFrame with columns wikiid, norm_query, hit_page_id, relevance.
+        DataFrame with columns wikiid, norm_query_id, hit_page_id, relevance.
     """
-    mjolnir.spark.assert_columns(df, ['wikiid', 'norm_query', 'session_id',
+    mjolnir.spark.assert_columns(df, ['wikiid', 'norm_query_id', 'session_id',
                                       'hit_page_id', 'hit_position', 
'clicked'])
 
     def train_partition(iterator):
@@ -161,7 +164,7 @@
         Returns
         -------
         list of tuples
-            List of (wikiid, norm_query, hit_page_id, relevance) tuples.
+            List of (wikiid, norm_query_id, hit_page_id, relevance) tuples.
         """
         reader = InputReader(dbn_config['MIN_DOCS_PER_QUERY'],
                              dbn_config['MAX_DOCS_PER_QUERY'],
@@ -177,18 +180,18 @@
 
     return (
         df
-        # group and collect up the hits for individual (wikiid, norm_query,
+        # group and collect up the hits for individual (wikiid, norm_query_id,
         # session_id) tuples to match how the dbn expects to receive data.
-        .groupby('wikiid', 'norm_query', 'session_id')
+        .groupby('wikiid', 'norm_query_id', 'session_id')
         .agg(F.collect_list(F.struct('hit_position', 'hit_page_id', 
'clicked')).alias('hits'))
         # Partition into small batches ensuring that all matching (wikiid,
-        # norm_query) rows end up on the same partition.
+        # norm_query_id) rows end up on the same partition.
         # TODO: The above groupby and this repartition both cause a shuffle, is
         # it possible to make that a single shuffle? Could push the final level
         # of grouping into python, but that could just as well end up worse?
-        .repartition(num_partitions, 'wikiid', 'norm_query')
+        .repartition(num_partitions, 'wikiid', 'norm_query_id')
         # Run each partition through the DBN to generate relevance scores.
         .rdd.mapPartitions(train_partition)
         # Convert the rdd of tuples back into a DataFrame so the fields all
         # have a name.
-        .toDF(['wikiid', 'norm_query', 'hit_page_id', 'relevance']))
+        .toDF(['wikiid', 'norm_query_id', 'hit_page_id', 'relevance']))
diff --git a/mjolnir/norm_query.py b/mjolnir/norm_query.py
new file mode 100644
index 0000000..35efe35
--- /dev/null
+++ b/mjolnir/norm_query.py
@@ -0,0 +1,171 @@
+"""
+???
+"""
+
+from collections import defaultdict
+import numpy as np
+import pyspark.sql.types
+from pyspark.sql import functions as F
+
+
+def _binary_sim_jaccard(X, Y):
+    """Calculate jaccard binary similarity between two vectors
+
+    Parameters
+    ----------
+    X : np.array
+    Y : np.array
+
+    Returns
+    -------
+    float
+    """
+    assert len(X) == len(Y)
+    # both true
+    a = np.sum((X == 1) & (Y == 1))
+    # both false
+    d = np.sum((X == 0) & (Y == 0))
+    # number of both true over number of both not false.
+    return a / float(len(X) - d)
+
+
+def _binary_sim(mat, sim=_binary_sim_jaccard):
+    """Compute a similarity matrix
+
+    Parameters
+    ----------
+    mat : np.array
+    sim : func
+        function taking two one dimensonal arrays of the same
+        size as input and returns a similarity value.
+
+    Returns
+    -------
+    np.array
+        matrix of shape (n_rows, n_rows) giving the similarity
+        between rows of the input matrix.
+    """
+    n_rows = mat.shape[0]
+    res = np.empty((n_rows, n_rows), dtype=np.float32)
+    for i in range(n_rows):
+        res[i][i] = 1.
+        for j in range(0, i):
+            res[i][j] = sim(mat[i], mat[j])
+            res[j][i] = res[i][j]
+    return res
+
+
+def _make_query_groups(source, threshold=0.5, top_n=5):
+    """Cluster together queries in source based on similarity of result sets
+
+    Parameters
+    ----------
+    source : list
+    threshold : float
+    top_n : int
+
+    Returns
+    -------
+    list
+    """
+    # Build up a matrix that has a unique query_id
+    # for each row, and the columns are all known page ids. Cells are set
+    # to True if that page id was shown for that query.
+    by_query = defaultdict(set)
+    all_page_ids = set()
+    for query_id, query_hit_page_ids in source:
+        for hit_page_ids in query_hit_page_ids:
+            by_query[query_id].update(hit_page_ids[:top_n])
+            all_page_ids.update(hit_page_ids[:top_n])
+    # sets don't have a defined order, so make it a list which has explicit 
ordering
+    # This list will be our set of columns in the matrix
+    all_page_ids = list(all_page_ids)
+    # shape is # rows x # columns
+    matrix = np.empty((len(by_query), len(all_page_ids)), dtype=np.bool)
+    for i, hit_page_ids in enumerate(by_query.values()):
+        for j, col_page_id in enumerate(all_page_ids):
+            matrix[i][j] = col_page_id in hit_page_ids
+
+    # Calculate jaccard similiarity for all combinations of queries. This could
+    # get very expensive for large matrices, but since we pre-grouped with the
+    # lucene stemmer the size should be reasonable enough.
+    sim = _binary_sim(matrix)
+
+    # Perform a very simple clustering of the similarities. There are probably
+    # much better algorithms for this but i don't know them...
+    groups = range(len(by_query))
+    for i in range(0, len(by_query)):
+        for j in range(0, i):
+            if sim[i][j] > threshold:
+                # TODO: why i = j instead of j = i?
+                groups[i] = groups[j]
+    return zip(by_query.keys(), groups)
+
+
+def transform(df):
+    """Group together similar results in df
+
+    Attaches a query_id and norm_query_id field to df. query_id uniquely 
identifies
+    a single query string to a single wiki. norm_query_id identifies clusters 
of similar
+    queries based on an initial grouping via the lucene stemmer, and then 
clustering
+    by similarity of result sets.
+
+    Parameters
+    ----------
+    df : pyspark.sql.DataFrame
+
+    Returns
+    -------
+    pyspark.sql.DataFrame
+    """
+    # This UDF must not be created at the top level, or simply including this 
file
+    # will magic up a SparkContext and prevent top level code from setting 
specific
+    # configuration.
+    _make_query_groups_udf = F.udf(_make_query_groups, 
pyspark.sql.types.ArrayType(
+        pyspark.sql.types.StructType([
+            pyspark.sql.types.StructField("query_id", 
pyspark.sql.types.StringType()),
+            pyspark.sql.types.StructField("group", 
pyspark.sql.types.IntegerType()),
+        ])))
+
+    # Convert (wiki, query) pairs into a unique id
+    df_query_id = (
+        df
+        .groupBy('wikiid', 'query')
+        .agg(F.collect_list('hit_page_ids').alias('query_hit_page_ids'))
+        .withColumn('query_id', F.monotonically_increasing_id())
+        # While spark is supposed to be deterministic, I have observed cases
+        # where without caching this the final join results in norm_query_id's
+        # with multiple wikiid's, which is incorrect.
+        .cache())
+
+    # The stemmer is a bit too brute force for some situations, we need to
+    # separate out queries that were grouped together but aren't really 
similar.
+    df_norm_query_id = (
+        df_query_id
+        # Start by building a row per normalized query
+        .groupBy('wikiid', F.expr('stemmer(query, substring(wikiid, 1, 
2))').alias('norm_query'))
+        .agg(F.collect_list(F.struct('query_id', 
'query_hit_page_ids')).alias('source'))
+        # This explode basically undoes the collect_list above, after attaching
+        # group information to each row using a binary vector similarity metric
+        .withColumn('norm_query_group', 
F.explode(_make_query_groups_udf('source')))
+        # Move the fields of norm_query_group to top level columns
+        .select('wikiid', 'norm_query', 
F.col('norm_query_group.query_id').alias('query_id'),
+                F.col('norm_query_group.group').alias('norm_query_group'))
+        # Re-group by the new groups so we can give them unique ids
+        .groupBy('wikiid', 'norm_query', 'norm_query_group')
+        .agg(F.collect_list('query_id').alias('query_ids'))
+        .select('query_ids', 
F.monotonically_increasing_id().alias('norm_query_id'))
+        # Expand back out into a df with columns (query_id, norm_query_id)
+        .select(F.explode('query_ids').alias('query_id'), 'norm_query_id'))
+
+    res = (
+        df
+        .join(df_query_id.select('wikiid', 'query', 'query_id'), how='inner', 
on=['wikiid', 'query'])
+        .join(df_norm_query_id, how='inner', on=['query_id'])
+        .cache())
+
+    # materialize res and unpersist the dataframes that make it up
+    res.count()
+    df_query_id.unpersist()
+
+    return res
diff --git a/mjolnir/sampling.py b/mjolnir/sampling.py
index 4adb578..d27a302 100644
--- a/mjolnir/sampling.py
+++ b/mjolnir/sampling.py
@@ -83,7 +83,7 @@
 
 
 def _sample_queries(df, wikis, num_buckets=100, samples_desired=10000, 
seed=None):
-    """Sample down a unique list of (wiki, norm_query, num_sessions)
+    """Sample down a unique list of (wiki, norm_query_id, num_sessions)
 
     Given a dataset of unique queries, sample it down to samples_desired per 
wiki
     maintaining the distribution of queries with many sessions and queries with
@@ -98,7 +98,7 @@
     Parameters
     ----------
     df : pyspark.sql.DataFrame
-        Input dataframe containing (wiki, norm_query, num_sessions) fields.
+        Input dataframe containing (wiki, norm_query_id, num_sessions) fields.
     wikis : list of strings
         List of wikis to generate samples for.
     num_buckets : int, optional
@@ -113,7 +113,7 @@
     Returns
     -------
     pyspark.sql.DataFrame
-        The set of sampled (wikiid, norm_query) rows desired with approximately
+        The set of sampled (wikiid, norm_query_id) rows desired with 
approximately
         samples_desired rows per wikiid.
     """
 
@@ -150,7 +150,7 @@
         if idx == len(splits):
             raise ValueError
         split = splits[idx]
-        return ((row.wikiid, split), (row.wikiid, row.norm_query))
+        return ((row.wikiid, split), (row.wikiid, row.norm_query_id))
 
     return (
         df.rdd
@@ -160,7 +160,7 @@
         .sampleByKey(withReplacement=False, fractions=wiki_fractions, 
seed=seed)
         # Convert the PairRDD back into a dataframe.
         .map(lambda (key, row): row)
-        .toDF(['wikiid', 'norm_query']))
+        .toDF(['wikiid', 'norm_query_id']))
 
 
 def sample(df, wikis, seed=None, queries_per_wiki=10000,
@@ -204,7 +204,7 @@
         The input DataFrame with all columns it origionally had sampled down
         based on the provided constraints.
     """
-    mjolnir.spark.assert_columns(df, ['wikiid', 'norm_query', 'session_id', 
'q_by_ip_day'])
+    mjolnir.spark.assert_columns(df, ['wikiid', 'norm_query_id', 'session_id', 
'q_by_ip_day'])
 
     # Filter down the input into the wikis we care about and remove sessions
     # from overly active users, which are presumably bots.
@@ -215,13 +215,13 @@
         .where(df.q_by_ip_day <= max_queries_per_ip_day)
         .drop(df.q_by_ip_day))
 
-    # Aggregate down into a unique set of (wikiid, norm_query) and add in a
+    # Aggregate down into a unique set of (wikiid, norm_query_id) and add in a
     # count of the number of unique sessions per pair. Filter on the number
     # of sessions as we need some minimum number of sessions per query to train
     # the DBN
     df_queries_unique = (
         df_filtered
-        .groupBy('wikiid', 'norm_query')
+        .groupBy('wikiid', 'norm_query_id')
         # To make QuantileDiscretizer happy later on, we need
         # to cast this to a double. Can be removed in 2.x which
         # accepts anything numeric.
@@ -235,4 +235,4 @@
     df_queries_sampled = _sample_queries(df_queries_unique, wikis, 
samples_desired=queries_per_wiki, seed=seed)
 
     # Select the rows chosen by sampling from the filtered df
-    return df_filtered.join(df_queries_sampled, how='inner', on=['wikiid', 
'norm_query'])
+    return df_filtered.join(df_queries_sampled, how='inner', on=['wikiid', 
'norm_query_id'])
diff --git a/mjolnir/test/fixtures/dbn_input.json 
b/mjolnir/test/fixtures/dbn_input.json
index 648a446..51d5eab 100644
--- a/mjolnir/test/fixtures/dbn_input.json
+++ b/mjolnir/test/fixtures/dbn_input.json
@@ -1,33 +1,33 @@
-{"wikiid": "foowiki", "norm_query": "test", "session_id": "abc", 
"hit_page_id": 1111, "hit_position": 1, "clicked": false}
-{"wikiid": "foowiki", "norm_query": "test", "session_id": "abc", 
"hit_page_id": 2222, "hit_position": 2, "clicked": true}
-{"wikiid": "foowiki", "norm_query": "test", "session_id": "abc", 
"hit_page_id": 3333, "hit_position": 3, "clicked": false}
-{"wikiid": "foowiki", "norm_query": "test", "session_id": "abc", 
"hit_page_id": 4444, "hit_position": 4, "clicked": false}
+{"wikiid": "foowiki", "norm_query_id": 12345, "session_id": "abc", 
"hit_page_id": 1111, "hit_position": 1, "clicked": false}
+{"wikiid": "foowiki", "norm_query_id": 12345, "session_id": "abc", 
"hit_page_id": 2222, "hit_position": 2, "clicked": true}
+{"wikiid": "foowiki", "norm_query_id": 12345, "session_id": "abc", 
"hit_page_id": 3333, "hit_position": 3, "clicked": false}
+{"wikiid": "foowiki", "norm_query_id": 12345, "session_id": "abc", 
"hit_page_id": 4444, "hit_position": 4, "clicked": false}
 
-{"wikiid": "foowiki", "norm_query": "test", "session_id": "def", 
"hit_page_id": 1111, "hit_position": 1, "clicked": true}
-{"wikiid": "foowiki", "norm_query": "test", "session_id": "def", 
"hit_page_id": 2222, "hit_position": 2, "clicked": false}
-{"wikiid": "foowiki", "norm_query": "test", "session_id": "def", 
"hit_page_id": 3333, "hit_position": 3, "clicked": false}
-{"wikiid": "foowiki", "norm_query": "test", "session_id": "def", 
"hit_page_id": 4444, "hit_position": 4, "clicked": false}
+{"wikiid": "foowiki", "norm_query_id": 12345, "session_id": "def", 
"hit_page_id": 1111, "hit_position": 1, "clicked": true}
+{"wikiid": "foowiki", "norm_query_id": 12345, "session_id": "def", 
"hit_page_id": 2222, "hit_position": 2, "clicked": false}
+{"wikiid": "foowiki", "norm_query_id": 12345, "session_id": "def", 
"hit_page_id": 3333, "hit_position": 3, "clicked": false}
+{"wikiid": "foowiki", "norm_query_id": 12345, "session_id": "def", 
"hit_page_id": 4444, "hit_position": 4, "clicked": false}
 
-{"wikiid": "foowiki", "norm_query": "test", "session_id": "ghi", 
"hit_page_id": 1111, "hit_position": 1, "clicked": true}
-{"wikiid": "foowiki", "norm_query": "test", "session_id": "ghi", 
"hit_page_id": 2222, "hit_position": 2, "clicked": false}
-{"wikiid": "foowiki", "norm_query": "test", "session_id": "ghi", 
"hit_page_id": 3333, "hit_position": 3, "clicked": false}
-{"wikiid": "foowiki", "norm_query": "test", "session_id": "ghi", 
"hit_page_id": 4444, "hit_position": 4, "clicked": false}
+{"wikiid": "foowiki", "norm_query_id": 12345, "session_id": "ghi", 
"hit_page_id": 1111, "hit_position": 1, "clicked": true}
+{"wikiid": "foowiki", "norm_query_id": 12345, "session_id": "ghi", 
"hit_page_id": 2222, "hit_position": 2, "clicked": false}
+{"wikiid": "foowiki", "norm_query_id": 12345, "session_id": "ghi", 
"hit_page_id": 3333, "hit_position": 3, "clicked": false}
+{"wikiid": "foowiki", "norm_query_id": 12345, "session_id": "ghi", 
"hit_page_id": 4444, "hit_position": 4, "clicked": false}
 
-{"wikiid": "foowiki", "norm_query": "zomg", "session_id": "abc", 
"hit_page_id": 1111, "hit_position": 1, "clicked": false}
-{"wikiid": "foowiki", "norm_query": "zomg", "session_id": "abc", 
"hit_page_id": 2222, "hit_position": 2, "clicked": false}
-{"wikiid": "foowiki", "norm_query": "zomg", "session_id": "abc", 
"hit_page_id": 3333, "hit_position": 3, "clicked": false}
-{"wikiid": "foowiki", "norm_query": "zomg", "session_id": "abc", 
"hit_page_id": 4444, "hit_position": 4, "clicked": true}
+{"wikiid": "foowiki", "norm_query_id": 23456, "session_id": "abc", 
"hit_page_id": 1111, "hit_position": 1, "clicked": false}
+{"wikiid": "foowiki", "norm_query_id": 23456, "session_id": "abc", 
"hit_page_id": 2222, "hit_position": 2, "clicked": false}
+{"wikiid": "foowiki", "norm_query_id": 23456, "session_id": "abc", 
"hit_page_id": 3333, "hit_position": 3, "clicked": false}
+{"wikiid": "foowiki", "norm_query_id": 23456, "session_id": "abc", 
"hit_page_id": 4444, "hit_position": 4, "clicked": true}
 
-{"wikiid": "foowiki", "norm_query": "zomg", "session_id": "def", 
"hit_page_id": 1111, "hit_position": 1, "clicked": false}
-{"wikiid": "foowiki", "norm_query": "zomg", "session_id": "def", 
"hit_page_id": 2222, "hit_position": 2, "clicked": false}
-{"wikiid": "foowiki", "norm_query": "zomg", "session_id": "def", 
"hit_page_id": 3333, "hit_position": 3, "clicked": true}
-{"wikiid": "foowiki", "norm_query": "zomg", "session_id": "def", 
"hit_page_id": 4444, "hit_position": 4, "clicked": false}
-{"wikiid": "foowiki", "norm_query": "zomg", "session_id": "def", 
"hit_page_id": 1111, "hit_position": 1, "clicked": false}
-{"wikiid": "foowiki", "norm_query": "zomg", "session_id": "def", 
"hit_page_id": 2222, "hit_position": 2, "clicked": false}
-{"wikiid": "foowiki", "norm_query": "zomg", "session_id": "def", 
"hit_page_id": 3333, "hit_position": 3, "clicked": false}
-{"wikiid": "foowiki", "norm_query": "zomg", "session_id": "def", 
"hit_page_id": 4444, "hit_position": 4, "clicked": true}
+{"wikiid": "foowiki", "norm_query_id": 23456, "session_id": "def", 
"hit_page_id": 1111, "hit_position": 1, "clicked": false}
+{"wikiid": "foowiki", "norm_query_id": 23456, "session_id": "def", 
"hit_page_id": 2222, "hit_position": 2, "clicked": false}
+{"wikiid": "foowiki", "norm_query_id": 23456, "session_id": "def", 
"hit_page_id": 3333, "hit_position": 3, "clicked": true}
+{"wikiid": "foowiki", "norm_query_id": 23456, "session_id": "def", 
"hit_page_id": 4444, "hit_position": 4, "clicked": false}
+{"wikiid": "foowiki", "norm_query_id": 23456, "session_id": "def", 
"hit_page_id": 1111, "hit_position": 1, "clicked": false}
+{"wikiid": "foowiki", "norm_query_id": 23456, "session_id": "def", 
"hit_page_id": 2222, "hit_position": 2, "clicked": false}
+{"wikiid": "foowiki", "norm_query_id": 23456, "session_id": "def", 
"hit_page_id": 3333, "hit_position": 3, "clicked": false}
+{"wikiid": "foowiki", "norm_query_id": 23456, "session_id": "def", 
"hit_page_id": 4444, "hit_position": 4, "clicked": true}
 
-{"wikiid": "foowiki", "norm_query": "zomg", "session_id": "ghi", 
"hit_page_id": 1111, "hit_position": 1, "clicked": false}
-{"wikiid": "foowiki", "norm_query": "zomg", "session_id": "ghi", 
"hit_page_id": 2222, "hit_position": 2, "clicked": true}
-{"wikiid": "foowiki", "norm_query": "zomg", "session_id": "ghi", 
"hit_page_id": 3333, "hit_position": 3, "clicked": false}
-{"wikiid": "foowiki", "norm_query": "zomg", "session_id": "ghi", 
"hit_page_id": 4444, "hit_position": 4, "clicked": false}
+{"wikiid": "foowiki", "norm_query_id": 23456, "session_id": "ghi", 
"hit_page_id": 1111, "hit_position": 1, "clicked": false}
+{"wikiid": "foowiki", "norm_query_id": 23456, "session_id": "ghi", 
"hit_page_id": 2222, "hit_position": 2, "clicked": true}
+{"wikiid": "foowiki", "norm_query_id": 23456, "session_id": "ghi", 
"hit_page_id": 3333, "hit_position": 3, "clicked": false}
+{"wikiid": "foowiki", "norm_query_id": 23456, "session_id": "ghi", 
"hit_page_id": 4444, "hit_position": 4, "clicked": false}
diff --git a/mjolnir/test/test_dbn.py b/mjolnir/test/test_dbn.py
index 4c8eb3f..324ed5e 100644
--- a/mjolnir/test/test_dbn.py
+++ b/mjolnir/test/test_dbn.py
@@ -17,7 +17,7 @@
     }, num_partitions=20)
     assert len(labeled.columns) == 4
     assert 'wikiid' in labeled.columns
-    assert 'norm_query' in labeled.columns
+    assert 'norm_query_id' in labeled.columns
     assert 'hit_page_id' in labeled.columns
     assert 'relevance' in labeled.columns
 
@@ -31,19 +31,19 @@
     assert u'foowiki' in wikiids
 
     # Make sure the set of unique queries is kept
-    queries = set([row.norm_query for row in data])
+    queries = set([row.norm_query_id for row in data])
     assert len(queries) == 2
-    assert u'test' in queries
-    assert u'zomg' in queries
+    assert 12345 in queries
+    assert 23456 in queries
 
     # Make sure the dbn is provided data in the right order, by looking at 
what comes out
     # at the top and bottom of each query. This should also detect if 
something went wrong
     # with partitioning, causing parts of a query to train in separate DBN's
-    test = sorted([row for row in data if row.norm_query == u'test'], 
key=lambda row: row.relevance, reverse=True)
+    test = sorted([row for row in data if row.norm_query_id == 12345], 
key=lambda row: row.relevance, reverse=True)
     assert test[0].hit_page_id == 1111
     assert test[3].hit_page_id == 3333
 
-    zomg = sorted([row for row in data if row.norm_query == u'zomg'], 
key=lambda row: row.relevance, reverse=True)
+    zomg = sorted([row for row in data if row.norm_query_id == 23456], 
key=lambda row: row.relevance, reverse=True)
     assert zomg[0].hit_page_id == 4444
     assert zomg[3].hit_page_id == 1111
     # page 1111 should have been skipped every time, resulting in a very low 
score
diff --git a/mjolnir/test/test_sampling.py b/mjolnir/test/test_sampling.py
index 81d6383..7255c00 100644
--- a/mjolnir/test/test_sampling.py
+++ b/mjolnir/test/test_sampling.py
@@ -17,7 +17,7 @@
         ('foo', 'c', 'ccc', 2),
         ('foo', 'd', 'ddd', 2),
         ('foo', 'e', 'eee', 2),
-    ]).toDF(['wikiid', 'norm_query', 'session_id', 'q_by_ip_day'])
+    ]).toDF(['wikiid', 'norm_query_id', 'session_id', 'q_by_ip_day'])
 
     sampled = mjolnir.sampling.sample(df, ['foo'], queries_per_wiki=100,
                                       min_sessions_per_query=1, 
seed=12345).collect()
@@ -51,7 +51,7 @@
                 session_id = "%s_%s_%s" % (wiki, q, str(j))
                 rows.append((wiki, q, session_id, 1))
 
-    df = spark_context.parallelize(rows).toDF(['wikiid', 'norm_query', 
'session_id', 'q_by_ip_day'])
+    df = spark_context.parallelize(rows).toDF(['wikiid', 'norm_query_id', 
'session_id', 'q_by_ip_day'])
     queries_per_wiki = 100
     df_sampled = mjolnir.sampling.sample(df, [wiki for (wiki, _, _) in wikis],
                                          queries_per_wiki=queries_per_wiki,
@@ -72,11 +72,11 @@
 
     # assert correlation between sessions per query
     orig_grouped = (
-        df.groupBy('wikiid', 'norm_query')
+        df.groupBy('wikiid', 'norm_query_id')
         .agg(F.countDistinct('session_id').alias('num_sessions'))
         .collect())
     sampled_grouped = (
-        df_sampled.groupBy('wikiid', 'norm_query')
+        df_sampled.groupBy('wikiid', 'norm_query_id')
         .agg(F.countDistinct('session_id').alias('num_sessions'))
         .collect())
 
diff --git a/mjolnir/test/training/test_tuning.py 
b/mjolnir/test/training/test_tuning.py
index 0eaf114..d8c8ff4 100644
--- a/mjolnir/test/training/test_tuning.py
+++ b/mjolnir/test/training/test_tuning.py
@@ -14,7 +14,7 @@
         # sufficiently large number of queries, or the split wont have
         # enough data for partitions to even out.
         .select(F.lit('foowiki').alias('wikiid'),
-                (F.col('id')/100).cast('int').alias('norm_query')))
+                (F.col('id')/100).cast('int').alias('norm_query_id')))
 
     with_folds = mjolnir.training.tuning.split(df, (0.8, 0.2), 
num_partitions=4).collect()
 
@@ -27,8 +27,8 @@
     assert 0.2 == pytest.approx(len(fold_1) / total_len, abs=0.015)
 
     # Check each norm query is only found on one side of the split
-    queries_in_0 = set([row.norm_query for row in fold_0])
-    queries_in_1 = set([row.norm_query for row in fold_1])
+    queries_in_0 = set([row.norm_query_id for row in fold_0])
+    queries_in_1 = set([row.norm_query_id for row in fold_1])
     assert len(queries_in_0.intersection(queries_in_1)) == 0
 
 
@@ -44,7 +44,7 @@
     return spark_context.parallelize(
         _make_q('abc') + _make_q('def') + _make_q('ghi') + _make_q('jkl')
         + _make_q('mno') + _make_q('pqr') + _make_q('stu')
-    ).toDF(['wikiid', 'norm_query', 'query', 'label', 'features'])
+    ).toDF(['wikiid', 'norm_query_id', 'query', 'label', 'features'])
 
 
 def test_cross_validate_plain_df(df_train):
diff --git a/mjolnir/training/tuning.py b/mjolnir/training/tuning.py
index d808619..1e115f5 100644
--- a/mjolnir/training/tuning.py
+++ b/mjolnir/training/tuning.py
@@ -30,7 +30,7 @@
     Parameters
     ----------
     df : pyspark.sql.DataFrame
-        Input data frame containing (wikiid, norm_query) columns. If this is
+        Input data frame containing (wikiid, norm_query_id) columns. If this is
         expensive to compute it should be cached, as it will be used twice.
     splits: list
         List of percentages, summing to 1, to split the input dataframe
@@ -51,7 +51,7 @@
     # to normalize instead of fail, but this is good enough.
     assert abs(1 - sum(splits)) < 0.01
 
-    mjolnir.spark.assert_columns(df, ['wikiid', 'norm_query'])
+    mjolnir.spark.assert_columns(df, ['wikiid', 'norm_query_id'])
 
     def split_partition(rows):
         # Current number of items per split
@@ -65,26 +65,26 @@
             for i, percent in enumerate(splits):
                 if split_counts[row.wikiid][i] / processed[row.wikiid] < 
percent:
                     split_counts[row.wikiid][i] += row.weight
-                    yield (row.wikiid, row.norm_query, i)
+                    yield (row.wikiid, row.norm_query_id, i)
                     break
             # If no split found assign to first split
             else:
                 split_counts[row.wikiid][0] += row.weight
-                yield (row.wikiid, row.norm_query, 0)
+                yield (row.wikiid, row.norm_query_id, 0)
             processed[row.wikiid] += row.weight
 
     df_splits = (
         df
-        .groupBy('wikiid', 'norm_query')
+        .groupBy('wikiid', 'norm_query_id')
         .agg(F.count(F.lit(1)).alias('weight'))
         # Could we guess the correct number of partitions instead? I'm not
         # sure though how it should be decided, and would require taking
         # an extra pass over the data.
         .coalesce(num_partitions)
         .rdd.mapPartitions(split_partition)
-        .toDF(['wikiid', 'norm_query', output_column]))
+        .toDF(['wikiid', 'norm_query_id', output_column]))
 
-    return df.join(df_splits, how='inner', on=['wikiid', 'norm_query'])
+    return df.join(df_splits, how='inner', on=['wikiid', 'norm_query_id'])
 
 
 def group_k_fold(df, num_folds, num_partitions=100, output_column='fold'):

-- 
To view, visit https://gerrit.wikimedia.org/r/357320
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I717c97cebf1762071c4e7bdf762aebb3a4520a97
Gerrit-PatchSet: 1
Gerrit-Project: search/MjoLniR
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to