[MediaWiki-commits] [Gerrit] search/MjoLniR[master]: Adapt feature logging with new ltr response format

2017-10-10 Thread EBernhardson (Code Review)
EBernhardson has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/381228 )

Change subject: Adapt feature logging with new ltr response format
..


Adapt feature logging with new ltr response format

in https://github.com/o19s/elasticsearch-learning-to-rank/pull/88
the response format was changed from a map to a list.
Adjust mjolnir so that it understand this new format.
This allows us to keep the same ordinals in mjolnir by using
OrderedDict.

Change-Id: Ib8c994b0dda7c22dc4a7724a141b94171c9dac85
---
M mjolnir/cli/data_pipeline.py
M mjolnir/features.py
M mjolnir/test/fixtures/requests/test_features.sqlite3
M mjolnir/test/test_features.py
4 files changed, 22 insertions(+), 21 deletions(-)

Approvals:
  EBernhardson: Verified; Looks good to me, approved



diff --git a/mjolnir/cli/data_pipeline.py b/mjolnir/cli/data_pipeline.py
index 28248aa..3d03ff4 100644
--- a/mjolnir/cli/data_pipeline.py
+++ b/mjolnir/cli/data_pipeline.py
@@ -11,6 +11,7 @@
 """
 
 import argparse
+from collections import OrderedDict
 import logging
 import mjolnir.dbn
 import mjolnir.metrics
@@ -128,7 +129,7 @@
 
 # Collect features for all known queries. Note that this intentionally
 # uses query and NOT norm_query_id. Merge those back into the source hits.
-fnames_accu = df_hits._sc.accumulator({}, 
mjolnir.features.FeatureNamesAccumulator())
+fnames_accu = df_hits._sc.accumulator(OrderedDict(), 
mjolnir.features.FeatureNamesAccumulator())
 if ltr_feature_definitions:
 if brokers:
 df_features = mjolnir.features.collect_from_ltr_plugin_and_kafka(
@@ -171,8 +172,10 @@
 # collect the accumulator
 df_features.cache().count()
 
-# TODO: test that features returned a coherent map (all features must have 
the same counts)
-features = sorted(fnames_accu.value)
+if len(set(fnames_accu.value.values())) != 1:
+raise ValueError("Not all features were collected properly: " + 
str(fnames_accu.value))
+
+features = fnames_accu.value.keys()
 df_features = df_features.withColumn('features', 
mjolnir.spark.add_meta(df_features._sc, F.col('features'), {
 'features': features
 }))
diff --git a/mjolnir/features.py b/mjolnir/features.py
index 5b2f30f..5cae635 100644
--- a/mjolnir/features.py
+++ b/mjolnir/features.py
@@ -3,7 +3,7 @@
 """
 
 import base64
-from collections import defaultdict, namedtuple
+from collections import defaultdict, namedtuple, OrderedDict
 import json
 import math
 import mjolnir.cirrus
@@ -176,8 +176,7 @@
 
 def extract_ltr_log_feature_values(response, accum):
 """Extract feature vector from ltr_log search ext
-Scan all hits and inspect the ltr_log field. Feature names are then
-lexically ordered to have consistent ordinals
+Scan all hits and inspect the ltr_log field.
 
 Parameters
 --
@@ -195,9 +194,11 @@
 for hit in response['hits']['hits']:
 page_id = int(hit['_id'])
 features = []
-counts = {}
-for n, v in 
sorted(hit['fields']['_ltrlog'][0]['sltr_log'].iteritems()):
-features.append(v)
+counts = OrderedDict()
+for v in hit['fields']['_ltrlog'][0]['sltr_log']:
+score = v['value']
+n = v['name']
+features.append(score)
 counts[n] = counts.get(n, 0) + 1
 accum += counts
 yield page_id, Vectors.dense(features)
@@ -343,7 +344,7 @@
 Keep a dict with feature names as keys and a counter as values.
 """
 def zero(self, value):
-return dict()
+return OrderedDict()
 
 def addInPlace(self, value1, value2):
 for k, v in value2.iteritems():
@@ -409,22 +410,18 @@
 assert 'responses' in parsed, response.text
 
 features = defaultdict(lambda: [float('nan')] * len(feature_names))
-features_seen = {}
-# feature ordinals (sorted by feature names)
-# ordinal_map[i] returns the feature ordinal for request i and 
feature_names[i]
-ordinal_map = [elts[0] for elts in sorted(enumerate(feature_names), 
key=lambda i:i[1])]
-ordinal_map = [elts[0] for elts in sorted(enumerate(ordinal_map), 
key=lambda i:i[1])]
+features_seen = OrderedDict()
 for i, response in enumerate(parsed['responses']):
-ordinal = ordinal_map[i]
 # These should be retried when making the queries. If we get this far 
then
 # no retry is possible anymore, and the default NaN value will signal 
to
 # throw away the hit_page_id
 if response['status'] != 200:
+features_seen[feature_names[i]] = 0
 continue
 features_seen[feature_names[i]] = 1
 for hit in response['hits']['hits']:
 hit_page_id = int(hit['_id'])
-features[hit_page_id][ordinal] = hit['_score']
+features[hit_page_id][i] = hit['_score']
 feature_names_accu += features_seen
 return 

[MediaWiki-commits] [Gerrit] search/MjoLniR[master]: Adapt feature logging with new ltr response format

2017-09-28 Thread DCausse (Code Review)
DCausse has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/381228 )

Change subject: Adapt feature logging with new ltr response format
..

Adapt feature logging with new ltr response format

in https://github.com/o19s/elasticsearch-learning-to-rank/pull/88
the response format was changed from a map to a list.
Adjust mjolnir so that it understand this new format.
This allows us to keep the same ordinals in mjolnir by using
OrderedDict.

Change-Id: Ib8c994b0dda7c22dc4a7724a141b94171c9dac85
---
M mjolnir/cli/data_pipeline.py
M mjolnir/features.py
M mjolnir/test/fixtures/requests/test_features.sqlite3
M mjolnir/test/test_features.py
4 files changed, 22 insertions(+), 21 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/search/MjoLniR 
refs/changes/28/381228/1

diff --git a/mjolnir/cli/data_pipeline.py b/mjolnir/cli/data_pipeline.py
index 28248aa..3d03ff4 100644
--- a/mjolnir/cli/data_pipeline.py
+++ b/mjolnir/cli/data_pipeline.py
@@ -11,6 +11,7 @@
 """
 
 import argparse
+from collections import OrderedDict
 import logging
 import mjolnir.dbn
 import mjolnir.metrics
@@ -128,7 +129,7 @@
 
 # Collect features for all known queries. Note that this intentionally
 # uses query and NOT norm_query_id. Merge those back into the source hits.
-fnames_accu = df_hits._sc.accumulator({}, 
mjolnir.features.FeatureNamesAccumulator())
+fnames_accu = df_hits._sc.accumulator(OrderedDict(), 
mjolnir.features.FeatureNamesAccumulator())
 if ltr_feature_definitions:
 if brokers:
 df_features = mjolnir.features.collect_from_ltr_plugin_and_kafka(
@@ -171,8 +172,10 @@
 # collect the accumulator
 df_features.cache().count()
 
-# TODO: test that features returned a coherent map (all features must have 
the same counts)
-features = sorted(fnames_accu.value)
+if len(set(fnames_accu.value.values())) != 1:
+raise ValueError("Not all features were collected properly: " + 
str(fnames_accu.value))
+
+features = fnames_accu.value.keys()
 df_features = df_features.withColumn('features', 
mjolnir.spark.add_meta(df_features._sc, F.col('features'), {
 'features': features
 }))
diff --git a/mjolnir/features.py b/mjolnir/features.py
index 5b2f30f..5cae635 100644
--- a/mjolnir/features.py
+++ b/mjolnir/features.py
@@ -3,7 +3,7 @@
 """
 
 import base64
-from collections import defaultdict, namedtuple
+from collections import defaultdict, namedtuple, OrderedDict
 import json
 import math
 import mjolnir.cirrus
@@ -176,8 +176,7 @@
 
 def extract_ltr_log_feature_values(response, accum):
 """Extract feature vector from ltr_log search ext
-Scan all hits and inspect the ltr_log field. Feature names are then
-lexically ordered to have consistent ordinals
+Scan all hits and inspect the ltr_log field.
 
 Parameters
 --
@@ -195,9 +194,11 @@
 for hit in response['hits']['hits']:
 page_id = int(hit['_id'])
 features = []
-counts = {}
-for n, v in 
sorted(hit['fields']['_ltrlog'][0]['sltr_log'].iteritems()):
-features.append(v)
+counts = OrderedDict()
+for v in hit['fields']['_ltrlog'][0]['sltr_log']:
+score = v['value']
+n = v['name']
+features.append(score)
 counts[n] = counts.get(n, 0) + 1
 accum += counts
 yield page_id, Vectors.dense(features)
@@ -343,7 +344,7 @@
 Keep a dict with feature names as keys and a counter as values.
 """
 def zero(self, value):
-return dict()
+return OrderedDict()
 
 def addInPlace(self, value1, value2):
 for k, v in value2.iteritems():
@@ -409,22 +410,18 @@
 assert 'responses' in parsed, response.text
 
 features = defaultdict(lambda: [float('nan')] * len(feature_names))
-features_seen = {}
-# feature ordinals (sorted by feature names)
-# ordinal_map[i] returns the feature ordinal for request i and 
feature_names[i]
-ordinal_map = [elts[0] for elts in sorted(enumerate(feature_names), 
key=lambda i:i[1])]
-ordinal_map = [elts[0] for elts in sorted(enumerate(ordinal_map), 
key=lambda i:i[1])]
+features_seen = OrderedDict()
 for i, response in enumerate(parsed['responses']):
-ordinal = ordinal_map[i]
 # These should be retried when making the queries. If we get this far 
then
 # no retry is possible anymore, and the default NaN value will signal 
to
 # throw away the hit_page_id
 if response['status'] != 200:
+features_seen[feature_names[i]] = 0
 continue
 features_seen[feature_names[i]] = 1
 for hit in response['hits']['hits']:
 hit_page_id = int(hit['_id'])
-features[hit_page_id][ordinal] = hit['_score']
+features[hit_page_id][i] = hit['_score']
 feature_names_accu += features_seen