EBernhardson has uploaded a new change for review. (
https://gerrit.wikimedia.org/r/357319 )
Change subject: Calculate NDCG of input click data
......................................................................
Calculate NDCG of input click data
Adds a step to the input data pipeline to calculate weighted
and unweighted ndcg of the incoming click data. This provides
a high level number the result of training can be compared
against to see if we are able to train something that provides
better results than what users were already provided in the
historical data.
The weighted ndcg calculates the ndcg for each individual user session
in the input data. The unweighted ndcg calculates the ndcg only once
per query, using the average position a hit was shown to sessions at.
The unweighted value should be the most directly comparable to xgboost
training results, as we are not (yet) able to provide weights to
xgboost4j-spark.
Change-Id: Ic28b6c345c656f670fe5f954d0fe520b93352369
---
M mjolnir/cli/data_pipeline.py
M mjolnir/cli/training_pipeline.py
A mjolnir/metrics.py
A mjolnir/test/test_metrics.py
4 files changed, 177 insertions(+), 94 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/search/MjoLniR
refs/changes/19/357319/1
diff --git a/mjolnir/cli/data_pipeline.py b/mjolnir/cli/data_pipeline.py
index 90024f4..7aa169e 100644
--- a/mjolnir/cli/data_pipeline.py
+++ b/mjolnir/cli/data_pipeline.py
@@ -11,8 +11,9 @@
"""
import mjolnir.dbn
-import mjolnir.sampling
+import mjolnir.metrics
import mjolnir.features
+import mjolnir.sampling
from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql import functions as F
@@ -54,9 +55,8 @@
.drop('hit_page_ids')
# Mark all hits that were clicked by a user
.withColumn('clicked', F.expr('array_contains(click_page_ids,
hit_page_id)'))
- .drop('click_page_ids'))
-
- df_sampled.cache()
+ .drop('click_page_ids')
+ .cache())
# Learn relevances
df_rel = (
@@ -72,15 +72,39 @@
# naive conversion of relevance % into a label
.withColumn('label', (F.col('relevance') * 10).cast('int')))
- df_hits = (
+ df_all_hits = (
df_sampled
+ .select('wikiid', 'query', 'norm_query', 'hit_page_id', 'session_id',
'hit_position')
+ .join(df_rel, how='inner', on=['wikiid', 'norm_query', 'hit_page_id'])
+ .cache())
+
+ # materialize df_all_hits and drop df_sampled
+ df_all_hits.count()
+ df_sampled.unpersist()
+
+ # TODO: Training is per-wiki, should this be as well?
+ ndcgAt10 = mjolnir.metrics.ndcg(df_all_hits, 10, query_cols=['wikiid',
'query', 'session_id'])
+ print 'weighted ndcg@10: %.4f' % (ndcgAt10)
+
+ df_hits = (
+ df_all_hits
.groupBy('wikiid', 'query', 'norm_query', 'hit_page_id')
# weight is now the number of times a hit was displayed to a user
- .agg(F.count(F.lit(1)).alias('weight'))
- # Join in the relevance labels
- .join(df_rel, how='inner', on=['wikiid', 'norm_query', 'hit_page_id']))
+ .agg(F.count(F.lit(1)).alias('weight'),
+ F.mean('hit_position').alias('hit_position'),
+ # These should be the same per group, but to keep things easy
+ # take first rather than grouping
+ F.first('label').alias('label'),
+ F.first('relevance').alias('relevance'))
+ .cache())
- df_hits.cache()
+ # materialize df_hits and drop df_all_hits
+ df_hits.count()
+ df_all_hits.unpersist()
+
+ # TODO: Training is per-wiki, should this be as well?
+ ndcgAt10 = mjolnir.metrics.ndcg(df_hits, 10, query_cols=['wikiid',
'query'])
+ print 'unweighted ndcg@10: %.4f' % (ndcgAt10)
# Collect features for all known queries. Note that this intentionally
# uses query and NOT norm_query. Merge those back into the source hits.
diff --git a/mjolnir/cli/training_pipeline.py b/mjolnir/cli/training_pipeline.py
index 7a0c7a6..ce34f76 100644
--- a/mjolnir/cli/training_pipeline.py
+++ b/mjolnir/cli/training_pipeline.py
@@ -1,98 +1,29 @@
"""
-Example script demonstrating the full LTR pipeline. It may
-not be desirable to run this all at once, but rather saving
-intermediate stages to HDFS for later analysis. This is mostly
-to demonstrate how everything ties together
+Example script demonstrating the training portion of the MLR pipeline.
+This is mostly to demonstrate how everything ties together
To run:
- PYSPARK_PYTHON=MJOLNIR/venv/bin/python spark-submit \
- --jars
hdfs://analytics-hadoop/wmf/refinery/current/artifacts/refinery-hive.jar \
- --artifacts 'mjolnir_venv.zip#MJOLNIR' \
+ PYSPARK_PYTHON=venv/bin/python spark-submit \
+ --jars /path/to/mjolnir-with-dependencies.jar
+ --artifacts 'mjolnir_venv.zip#venv' \
path/to/training_pipeline.py
"""
-import mjolnir.dbn
-import mjolnir.sampling
-import mjolnir.features
import mjolnir.training
import mjolnir.training.xgboost
from pyspark import SparkContext
from pyspark.sql import HiveContext
-from pyspark.sql import functions as F
def main(sc, sqlContext):
- sqlContext.sql("CREATE TEMPORARY FUNCTION stemmer AS
'org.wikimedia.analytics.refinery.hive.StemmerUDF'")
-
- # Load click data from HDFS
- df_clicks = (
- sqlContext.read.parquet(
-
'hdfs://analytics-hadoop/wmf/data/discovery/query_clicks/daily/year=*/month=*/day=*')
- # Clicks and hits contains a bunch of useful debugging data, but we
don't
- # need any of that here. Save a bunch of memory by only working with
- # lists of page ids
- .withColumn('hit_page_ids', F.col('hits.pageid'))
- .drop('hits')
- .withColumn('click_page_ids', F.col('clicks.pageid'))
- .drop('clicks')
- # Normalize queries using the lucene stemmer
- .withColumn('norm_query', F.expr('stemmer(query, substring(wikiid, 1,
2))')))
-
- # Sample to some subset of queries per wiki
- df_sampled = (
- mjolnir.sampling.sample(
- df_clicks,
- wikis=['enwiki', 'dewiki', 'ruwiki', 'frwiki'],
- seed=54321,
- queries_per_wiki=20000,
- min_sessions_per_query=10)
- # Explode source into a row per displayed hit, reduce to a row per
- # unique (wikiid, query, page_id) and add a count of the number of
- # duplicates removed.
- .select('*', F.expr("posexplode(hit_page_ids)").alias('hit_position',
'hit_page_id'))
- .drop('hit_page_ids')
- .withColumn('clicked', F.expr('array_contains(click_page_ids,
hit_page_id)'))
- .drop('click_page_ids'))
-
- # Learn relevances
- df_rel = (
- mjolnir.dbn.train(df_sampled, {
- 'MAX_ITERATIONS': 40,
- 'DEBUG': False,
- 'PRETTY_LOG': True,
- 'MIN_DOCS_PER_QUERY': 10,
- 'MAX_DOCS_PER_QUERY': 20,
- 'SERP_SIZE': 20,
- 'QUERY_INDEPENDENT_PAGER': False,
- 'DEFAULT_REL': 0.5})
- # naive conversion of relevance % into a label
- .withColumn('label', (F.col('relevance') * 10).cast('int')))
-
- df_hits = (
- df_sampled
- .groupBy('wikiid', 'query', 'norm_query', 'hit_page_id')
- # weight is now the number of times a hit was displayed to a user
- .agg(F.count(F.lit(1)).alias('weight'))
- # Join in the relevance labels
- .join(df_rel, how='inner', on=['wikiid', 'norm_query', 'hit_page_id']))
-
- # Collect features for all known queries. Note that this intentionally
- # uses query and NOT norm_query. Merge those back into the source hits.
- df_features = mjolnir.features.collect(
- df_hits,
- url_list=['http://elastic%d.eqiad.wmnet:9200/_msearch' % (i) for i in
range(1017, 1053)],
- indices={wiki: '%s_content' % (wiki) for wiki in ['enwiki', 'dewiki',
'frwiki', 'ruwiki']},
- feature_definitions=mjolnir.features.enwiki_features())
- df_hits_with_features = df_hits.join(df_features, how='inner',
on=['wikiid', 'query', 'hit_page_id'])
-
- # Probably this should be written out to disk, then loaded from disk to
run training.
- # You do not want to accidentally re-request all features from
elasticsearch
- df_hits_with_features.cache()
+ # TODO: cli argument
+ in_path =
'hdfs://analytics-hadoop/user/ebernhardson/mjolnir/hits_with_features'
+ df_hits_with_features = sqlContext.read.parquet(in_path)
# Doesn't have to be done ahead of time, but if the data is used multiple
times
# (eg train and eval) then it should to save work.
df_grouped, j_groups = mjolnir.training.xgboost.prep_training(
- df_hits_with_features, {'num_workers': 10})
+ df_hits_with_features, 10)
# Train a model
# Note that this might be best done in a separate spark context, with
different options.
@@ -110,15 +41,14 @@
print 'train-ndcg@10: %.3f' % (model.eval(df_grouped, j_groups))
- # TODO: Nothing below here is implemented, and is only included
- # as a rough estimate of what will be implemented.
-
- # Write out the model in a format suitable for loading into
- # the elasticsearch plugin
- model.write('/home/ebernhardson/xgboost_model.xml')
+ # dumped is now a list of strings each containing json of one
+ # tree in the ensemble.
+ dumped = model.dump()
+ with open('/home/ebernahrdson/xgboost_model.json', 'wb') as f:
+ f.writelines(dumped)
if __name__ == "__main__":
- sc = SparkContext(appName="LTR: training pipeline")
+ sc = SparkContext(appName="MLR: training pipeline")
sqlContext = HiveContext(sc)
main(sc, sqlContext)
diff --git a/mjolnir/metrics.py b/mjolnir/metrics.py
new file mode 100644
index 0000000..f4d8e7c
--- /dev/null
+++ b/mjolnir/metrics.py
@@ -0,0 +1,77 @@
+"""
+Calculates NDCG@k values for click data
+"""
+
+import math
+from pyspark.sql import functions as F
+from pyspark.sql import Window
+import pyspark.sql.types
+
+
+def _ndcg_at(k, label_col):
+ def ndcg_at_k(predicted, actual):
+ # TODO: Taking in rn and then re-sorting might not be necessary, but i
can't
+ # find any real guarantee that they would come in order after a
groupBy + collect_list,
+ # since they were only ordered within the window function.
+ predicted = [row[label_col] for row in sorted(predicted, key=lambda r:
r.rn)]
+ actual = [row[label_col] for row in sorted(actual, key=lambda r: r.rn)]
+ dcg = 0.
+ for i, label in enumerate(predicted):
+ # This form is used to match EvalNDCG in xgboost
+ dcg += ((1 << label) - 1) / math.log(i + 2.0, 2)
+ idcg = 0.
+ for i, label in enumerate(actual):
+ idcg += ((1 << label) - 1) / math.log(i + 2.0, 2)
+ if idcg == 0:
+ return 0
+ else:
+ return dcg / idcg
+ return F.udf(ndcg_at_k, pyspark.sql.types.DoubleType())
+
+
+def ndcg(df, k, label_col='label', position_col='hit_position',
query_cols=['wikiid', 'query', 'session_id']):
+ """
+ Calculate ndcg@k for the provided dataframe
+
+ Parameters
+ ----------
+ df : pyspark.sql.DataFrame
+ Input dataframe to calculate against
+ k : int
+ Cutoff for ndcg calculation
+ label_col : str
+ Column name containing integer label, higher is better, of the hit
+ position_col : str
+ Column name containing order displayed to user, lowest first, of the
hit
+ query_cols : list of str
+ Column names to group by, which indicate a unique query displayed to a
user
+
+ Returns
+ -------
+ float
+ The ndcg@k value, always between 0 and 1
+ """
+ # ideal results per labels
+ w = Window.partitionBy(*query_cols).orderBy(F.col(label_col).desc())
+ topAtK = (
+ df
+ .select(label_col, *query_cols)
+ .withColumn('rn', F.row_number().over(w))
+ .where(F.col('rn') <= k)
+ .groupBy(*query_cols)
+ .agg(F.collect_list(F.struct(label_col, 'rn')).alias('topAtK')))
+ # top k results shown to user
+ w = Window.partitionBy(*query_cols).orderBy(F.col(position_col).asc())
+ predictedTopAtK = (
+ df
+ .select(label_col, position_col, *query_cols)
+ .withColumn('rn', F.row_number().over(w))
+ .where(F.col('rn') <= k)
+ .groupBy(*query_cols)
+ .agg(F.collect_list(F.struct(label_col,
'rn')).alias('predictedTopAtK')))
+ return (
+ topAtK
+ .join(predictedTopAtK, query_cols, how='inner')
+ .select(_ndcg_at(k, label_col)('predictedTopAtK',
'topAtK').alias('ndcgAtK'))
+ .select(F.mean('ndcgAtK').alias('ndcgAtK'))
+ .collect()[0].ndcgAtK)
diff --git a/mjolnir/test/test_metrics.py b/mjolnir/test/test_metrics.py
new file mode 100644
index 0000000..8757540
--- /dev/null
+++ b/mjolnir/test/test_metrics.py
@@ -0,0 +1,52 @@
+import mjolnir.metrics
+import pytest
+
+
+def test_ndcg_doesnt_completely_fail(spark_context, hive_context):
+ "Mediocre test that just looks for a happy path"
+ df = spark_context.parallelize([
+ [4, 0, 'foo'],
+ [3, 1, 'foo'],
+ [0, 2, 'foo'],
+ [3, 3, 'foo'],
+ ]).toDF(['label', 'hit_position', 'query'])
+
+ # Top 2 are in perfect order. Also this indirectly tests that
+ # k is really top 2, and not somehow top 3 or some such
+ ndcg_at_2 = mjolnir.metrics.ndcg(df, 2, query_cols=['query'])
+ assert 1.0 == ndcg_at_2
+
+ # Top 4 are slightly out. This value was checked by also
+ # calculating by hand.
+ ndcg_at_4 = mjolnir.metrics.ndcg(df, 4, query_cols=['query'])
+ assert 0.9788 == pytest.approx(ndcg_at_4, 0.0001)
+
+
+def test_query_can_be_multiple_columns(spark_context, hive_context):
+ df_a = spark_context.parallelize([
+ [4, 0, 'foo', 'bar'],
+ [3, 1, 'foo', 'bar'],
+ [1, 2, 'foo', 'bar'],
+ [2, 3, 'foo', 'bar'],
+ ]).toDF(['label', 'hit_position', 'query', 'wiki'])
+
+ df_b = spark_context.parallelize([
+ [1, 0, 'foo', 'wot'],
+ [3, 1, 'foo', 'wot'],
+ [1, 2, 'foo', 'wot'],
+ [4, 3, 'foo', 'wot'],
+ ]).toDF(['label', 'hit_position', 'query', 'wiki'])
+
+ df_merged = (
+ spark_context
+ .union([df_a.rdd, df_b.rdd])
+ .toDF(['label', 'hit_position', 'query', 'wiki']))
+
+ ndcg_a = mjolnir.metrics.ndcg(df_a, 4, query_cols=['query', 'wiki'])
+ ndcg_b = mjolnir.metrics.ndcg(df_b, 4, query_cols=['query', 'wiki'])
+
+ # If we appropriately seperate foo/bar from foo/wot, instead of treating
+ # all foo's the same then the result will be the average of the two
+ # queries.
+ ndcg_merged = mjolnir.metrics.ndcg(df_merged, 4, query_cols=['query',
'wiki'])
+ assert ndcg_merged == (ndcg_a + ndcg_b) / 2
--
To view, visit https://gerrit.wikimedia.org/r/357319
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ic28b6c345c656f670fe5f954d0fe520b93352369
Gerrit-PatchSet: 1
Gerrit-Project: search/MjoLniR
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits