EBernhardson has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/357319 )

Change subject: Calculate NDCG of input click data
......................................................................

Calculate NDCG of input click data

Adds a step to the input data pipeline to calculate weighted
and unweighted ndcg of the incoming click data. This provides
a high level number the result of training can be compared
against to see if we are able to train something that provides
better results than what users were already provided in the
historical data.

The weighted ndcg calculates the ndcg for each individual user session
in the input data. The unweighted ndcg calculates the ndcg only once
per query, using the average position a hit was shown to sessions at.
The unweighted value should be the most directly comparable to xgboost
training results, as we are not (yet) able to provide weights to
xgboost4j-spark.

Change-Id: Ic28b6c345c656f670fe5f954d0fe520b93352369
---
M mjolnir/cli/data_pipeline.py
M mjolnir/cli/training_pipeline.py
A mjolnir/metrics.py
A mjolnir/test/test_metrics.py
4 files changed, 177 insertions(+), 94 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/search/MjoLniR 
refs/changes/19/357319/1

diff --git a/mjolnir/cli/data_pipeline.py b/mjolnir/cli/data_pipeline.py
index 90024f4..7aa169e 100644
--- a/mjolnir/cli/data_pipeline.py
+++ b/mjolnir/cli/data_pipeline.py
@@ -11,8 +11,9 @@
 """
 
 import mjolnir.dbn
-import mjolnir.sampling
+import mjolnir.metrics
 import mjolnir.features
+import mjolnir.sampling
 from pyspark import SparkContext
 from pyspark.sql import HiveContext
 from pyspark.sql import functions as F
@@ -54,9 +55,8 @@
         .drop('hit_page_ids')
         # Mark all hits that were clicked by a user
         .withColumn('clicked', F.expr('array_contains(click_page_ids, 
hit_page_id)'))
-        .drop('click_page_ids'))
-
-    df_sampled.cache()
+        .drop('click_page_ids')
+        .cache())
 
     # Learn relevances
     df_rel = (
@@ -72,15 +72,39 @@
         # naive conversion of relevance % into a label
         .withColumn('label', (F.col('relevance') * 10).cast('int')))
 
-    df_hits = (
+    df_all_hits = (
         df_sampled
+        .select('wikiid', 'query', 'norm_query', 'hit_page_id', 'session_id', 
'hit_position')
+        .join(df_rel, how='inner', on=['wikiid', 'norm_query', 'hit_page_id'])
+        .cache())
+
+    # materialize df_all_hits and drop df_sampled
+    df_all_hits.count()
+    df_sampled.unpersist()
+
+    # TODO: Training is per-wiki, should this be as well?
+    ndcgAt10 = mjolnir.metrics.ndcg(df_all_hits, 10, query_cols=['wikiid', 
'query', 'session_id'])
+    print 'weighted ndcg@10: %.4f' % (ndcgAt10)
+
+    df_hits = (
+        df_all_hits
         .groupBy('wikiid', 'query', 'norm_query', 'hit_page_id')
         # weight is now the number of times a hit was displayed to a user
-        .agg(F.count(F.lit(1)).alias('weight'))
-        # Join in the relevance labels
-        .join(df_rel, how='inner', on=['wikiid', 'norm_query', 'hit_page_id']))
+        .agg(F.count(F.lit(1)).alias('weight'),
+             F.mean('hit_position').alias('hit_position'),
+             # These should be the same per group, but to keep things easy
+             # take first rather than grouping
+             F.first('label').alias('label'),
+             F.first('relevance').alias('relevance'))
+        .cache())
 
-    df_hits.cache()
+    # materialize df_hits and drop df_all_hits
+    df_hits.count()
+    df_all_hits.unpersist()
+
+    # TODO: Training is per-wiki, should this be as well?
+    ndcgAt10 = mjolnir.metrics.ndcg(df_hits, 10, query_cols=['wikiid', 
'query'])
+    print 'unweighted ndcg@10: %.4f' % (ndcgAt10)
 
     # Collect features for all known queries. Note that this intentionally
     # uses query and NOT norm_query. Merge those back into the source hits.
diff --git a/mjolnir/cli/training_pipeline.py b/mjolnir/cli/training_pipeline.py
index 7a0c7a6..ce34f76 100644
--- a/mjolnir/cli/training_pipeline.py
+++ b/mjolnir/cli/training_pipeline.py
@@ -1,98 +1,29 @@
 """
-Example script demonstrating the full LTR pipeline. It may
-not be desirable to run this all at once, but rather saving
-intermediate stages to HDFS for later analysis. This is mostly
-to demonstrate how everything ties together
+Example script demonstrating the training portion of the MLR pipeline.
+This is mostly to demonstrate how everything ties together
 
 To run:
-    PYSPARK_PYTHON=MJOLNIR/venv/bin/python spark-submit \
-        --jars 
hdfs://analytics-hadoop/wmf/refinery/current/artifacts/refinery-hive.jar \
-        --artifacts 'mjolnir_venv.zip#MJOLNIR' \
+    PYSPARK_PYTHON=venv/bin/python spark-submit \
+        --jars /path/to/mjolnir-with-dependencies.jar
+        --artifacts 'mjolnir_venv.zip#venv' \
         path/to/training_pipeline.py
 """
 
-import mjolnir.dbn
-import mjolnir.sampling
-import mjolnir.features
 import mjolnir.training
 import mjolnir.training.xgboost
 from pyspark import SparkContext
 from pyspark.sql import HiveContext
-from pyspark.sql import functions as F
 
 
 def main(sc, sqlContext):
-    sqlContext.sql("CREATE TEMPORARY FUNCTION stemmer AS 
'org.wikimedia.analytics.refinery.hive.StemmerUDF'")
-
-    # Load click data from HDFS
-    df_clicks = (
-        sqlContext.read.parquet(
-            
'hdfs://analytics-hadoop/wmf/data/discovery/query_clicks/daily/year=*/month=*/day=*')
-        # Clicks and hits contains a bunch of useful debugging data, but we 
don't
-        # need any of that here. Save a bunch of memory by only working with
-        # lists of page ids
-        .withColumn('hit_page_ids', F.col('hits.pageid'))
-        .drop('hits')
-        .withColumn('click_page_ids', F.col('clicks.pageid'))
-        .drop('clicks')
-        # Normalize queries using the lucene stemmer
-        .withColumn('norm_query', F.expr('stemmer(query, substring(wikiid, 1, 
2))')))
-
-    # Sample to some subset of queries per wiki
-    df_sampled = (
-        mjolnir.sampling.sample(
-            df_clicks,
-            wikis=['enwiki', 'dewiki', 'ruwiki', 'frwiki'],
-            seed=54321,
-            queries_per_wiki=20000,
-            min_sessions_per_query=10)
-        # Explode source into a row per displayed hit, reduce to a row per
-        # unique (wikiid, query, page_id) and add a count of the number of
-        # duplicates removed.
-        .select('*', F.expr("posexplode(hit_page_ids)").alias('hit_position', 
'hit_page_id'))
-        .drop('hit_page_ids')
-        .withColumn('clicked', F.expr('array_contains(click_page_ids, 
hit_page_id)'))
-        .drop('click_page_ids'))
-
-    # Learn relevances
-    df_rel = (
-        mjolnir.dbn.train(df_sampled, {
-            'MAX_ITERATIONS': 40,
-            'DEBUG': False,
-            'PRETTY_LOG': True,
-            'MIN_DOCS_PER_QUERY': 10,
-            'MAX_DOCS_PER_QUERY': 20,
-            'SERP_SIZE': 20,
-            'QUERY_INDEPENDENT_PAGER': False,
-            'DEFAULT_REL': 0.5})
-        # naive conversion of relevance % into a label
-        .withColumn('label', (F.col('relevance') * 10).cast('int')))
-
-    df_hits = (
-        df_sampled
-        .groupBy('wikiid', 'query', 'norm_query', 'hit_page_id')
-        # weight is now the number of times a hit was displayed to a user
-        .agg(F.count(F.lit(1)).alias('weight'))
-        # Join in the relevance labels
-        .join(df_rel, how='inner', on=['wikiid', 'norm_query', 'hit_page_id']))
-
-    # Collect features for all known queries. Note that this intentionally
-    # uses query and NOT norm_query. Merge those back into the source hits.
-    df_features = mjolnir.features.collect(
-        df_hits,
-        url_list=['http://elastic%d.eqiad.wmnet:9200/_msearch' % (i) for i in 
range(1017, 1053)],
-        indices={wiki: '%s_content' % (wiki) for wiki in ['enwiki', 'dewiki', 
'frwiki', 'ruwiki']},
-        feature_definitions=mjolnir.features.enwiki_features())
-    df_hits_with_features = df_hits.join(df_features, how='inner', 
on=['wikiid', 'query', 'hit_page_id'])
-
-    # Probably this should be written out to disk, then loaded from disk to 
run training.
-    # You do not want to accidentally re-request all features from 
elasticsearch
-    df_hits_with_features.cache()
+    # TODO: cli argument
+    in_path = 
'hdfs://analytics-hadoop/user/ebernhardson/mjolnir/hits_with_features'
+    df_hits_with_features = sqlContext.read.parquet(in_path)
 
     # Doesn't have to be done ahead of time, but if the data is used multiple 
times
     # (eg train and eval) then it should to save work.
     df_grouped, j_groups = mjolnir.training.xgboost.prep_training(
-        df_hits_with_features, {'num_workers': 10})
+        df_hits_with_features, 10)
 
     # Train a model
     # Note that this might be best done in a separate spark context, with 
different options.
@@ -110,15 +41,14 @@
 
     print 'train-ndcg@10: %.3f' % (model.eval(df_grouped, j_groups))
 
-    # TODO: Nothing below here is implemented, and is only included
-    # as a rough estimate of what will be implemented.
-
-    # Write out the model in a format suitable for loading into
-    # the elasticsearch plugin
-    model.write('/home/ebernhardson/xgboost_model.xml')
+    # dumped is now a list of strings each containing json of one
+    # tree in the ensemble.
+    dumped = model.dump()
+    with open('/home/ebernahrdson/xgboost_model.json', 'wb') as f:
+        f.writelines(dumped)
 
 
 if __name__ == "__main__":
-    sc = SparkContext(appName="LTR: training pipeline")
+    sc = SparkContext(appName="MLR: training pipeline")
     sqlContext = HiveContext(sc)
     main(sc, sqlContext)
diff --git a/mjolnir/metrics.py b/mjolnir/metrics.py
new file mode 100644
index 0000000..f4d8e7c
--- /dev/null
+++ b/mjolnir/metrics.py
@@ -0,0 +1,77 @@
+"""
+Calculates NDCG@k values for click data
+"""
+
+import math
+from pyspark.sql import functions as F
+from pyspark.sql import Window
+import pyspark.sql.types
+
+
+def _ndcg_at(k, label_col):
+    def ndcg_at_k(predicted, actual):
+        # TODO: Taking in rn and then re-sorting might not be necessary, but i 
can't
+        # find any real guarantee that they would come in order after a 
groupBy + collect_list,
+        # since they were only ordered within the window function.
+        predicted = [row[label_col] for row in sorted(predicted, key=lambda r: 
r.rn)]
+        actual = [row[label_col] for row in sorted(actual, key=lambda r: r.rn)]
+        dcg = 0.
+        for i, label in enumerate(predicted):
+            # This form is used to match EvalNDCG in xgboost
+            dcg += ((1 << label) - 1) / math.log(i + 2.0, 2)
+        idcg = 0.
+        for i, label in enumerate(actual):
+            idcg += ((1 << label) - 1) / math.log(i + 2.0, 2)
+        if idcg == 0:
+            return 0
+        else:
+            return dcg / idcg
+    return F.udf(ndcg_at_k, pyspark.sql.types.DoubleType())
+
+
+def ndcg(df, k, label_col='label', position_col='hit_position', 
query_cols=['wikiid', 'query', 'session_id']):
+    """
+    Calculate ndcg@k for the provided dataframe
+
+    Parameters
+    ----------
+    df : pyspark.sql.DataFrame
+        Input dataframe to calculate against
+    k : int
+        Cutoff for ndcg calculation
+    label_col : str
+        Column name containing integer label, higher is better, of the hit
+    position_col : str
+        Column name containing order displayed to user, lowest first, of the 
hit
+    query_cols : list of str
+        Column names to group by, which indicate a unique query displayed to a 
user
+
+    Returns
+    -------
+    float
+        The ndcg@k value, always between 0 and 1
+    """
+    # ideal results per labels
+    w = Window.partitionBy(*query_cols).orderBy(F.col(label_col).desc())
+    topAtK = (
+        df
+        .select(label_col, *query_cols)
+        .withColumn('rn', F.row_number().over(w))
+        .where(F.col('rn') <= k)
+        .groupBy(*query_cols)
+        .agg(F.collect_list(F.struct(label_col, 'rn')).alias('topAtK')))
+    # top k results shown to user
+    w = Window.partitionBy(*query_cols).orderBy(F.col(position_col).asc())
+    predictedTopAtK = (
+        df
+        .select(label_col, position_col, *query_cols)
+        .withColumn('rn', F.row_number().over(w))
+        .where(F.col('rn') <= k)
+        .groupBy(*query_cols)
+        .agg(F.collect_list(F.struct(label_col, 
'rn')).alias('predictedTopAtK')))
+    return (
+        topAtK
+        .join(predictedTopAtK, query_cols, how='inner')
+        .select(_ndcg_at(k, label_col)('predictedTopAtK', 
'topAtK').alias('ndcgAtK'))
+        .select(F.mean('ndcgAtK').alias('ndcgAtK'))
+        .collect()[0].ndcgAtK)
diff --git a/mjolnir/test/test_metrics.py b/mjolnir/test/test_metrics.py
new file mode 100644
index 0000000..8757540
--- /dev/null
+++ b/mjolnir/test/test_metrics.py
@@ -0,0 +1,52 @@
+import mjolnir.metrics
+import pytest
+
+
+def test_ndcg_doesnt_completely_fail(spark_context, hive_context):
+    "Mediocre test that just looks for a happy path"
+    df = spark_context.parallelize([
+        [4, 0, 'foo'],
+        [3, 1, 'foo'],
+        [0, 2, 'foo'],
+        [3, 3, 'foo'],
+    ]).toDF(['label', 'hit_position', 'query'])
+
+    # Top 2 are in perfect order. Also this indirectly tests that
+    # k is really top 2, and not somehow top 3 or some such
+    ndcg_at_2 = mjolnir.metrics.ndcg(df, 2, query_cols=['query'])
+    assert 1.0 == ndcg_at_2
+
+    # Top 4 are slightly out. This value was checked by also
+    # calculating by hand.
+    ndcg_at_4 = mjolnir.metrics.ndcg(df, 4, query_cols=['query'])
+    assert 0.9788 == pytest.approx(ndcg_at_4, 0.0001)
+
+
+def test_query_can_be_multiple_columns(spark_context, hive_context):
+    df_a = spark_context.parallelize([
+        [4, 0, 'foo', 'bar'],
+        [3, 1, 'foo', 'bar'],
+        [1, 2, 'foo', 'bar'],
+        [2, 3, 'foo', 'bar'],
+    ]).toDF(['label', 'hit_position', 'query', 'wiki'])
+
+    df_b = spark_context.parallelize([
+        [1, 0, 'foo', 'wot'],
+        [3, 1, 'foo', 'wot'],
+        [1, 2, 'foo', 'wot'],
+        [4, 3, 'foo', 'wot'],
+    ]).toDF(['label', 'hit_position', 'query', 'wiki'])
+
+    df_merged = (
+        spark_context
+        .union([df_a.rdd, df_b.rdd])
+        .toDF(['label', 'hit_position', 'query', 'wiki']))
+
+    ndcg_a = mjolnir.metrics.ndcg(df_a, 4, query_cols=['query', 'wiki'])
+    ndcg_b = mjolnir.metrics.ndcg(df_b, 4, query_cols=['query', 'wiki'])
+
+    # If we appropriately seperate foo/bar from foo/wot, instead of treating
+    # all foo's the same then the result will be the average of the two
+    # queries.
+    ndcg_merged = mjolnir.metrics.ndcg(df_merged, 4, query_cols=['query', 
'wiki'])
+    assert ndcg_merged == (ndcg_a + ndcg_b) / 2

-- 
To view, visit https://gerrit.wikimedia.org/r/357319
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ic28b6c345c656f670fe5f954d0fe520b93352369
Gerrit-PatchSet: 1
Gerrit-Project: search/MjoLniR
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to