[MediaWiki-commits] [Gerrit] search/MjoLniR[master]: Adapt feature logging with new ltr response format
EBernhardson has submitted this change and it was merged. ( https://gerrit.wikimedia.org/r/381228 ) Change subject: Adapt feature logging with new ltr response format .. Adapt feature logging with new ltr response format in https://github.com/o19s/elasticsearch-learning-to-rank/pull/88 the response format was changed from a map to a list. Adjust mjolnir so that it understand this new format. This allows us to keep the same ordinals in mjolnir by using OrderedDict. Change-Id: Ib8c994b0dda7c22dc4a7724a141b94171c9dac85 --- M mjolnir/cli/data_pipeline.py M mjolnir/features.py M mjolnir/test/fixtures/requests/test_features.sqlite3 M mjolnir/test/test_features.py 4 files changed, 22 insertions(+), 21 deletions(-) Approvals: EBernhardson: Verified; Looks good to me, approved diff --git a/mjolnir/cli/data_pipeline.py b/mjolnir/cli/data_pipeline.py index 28248aa..3d03ff4 100644 --- a/mjolnir/cli/data_pipeline.py +++ b/mjolnir/cli/data_pipeline.py @@ -11,6 +11,7 @@ """ import argparse +from collections import OrderedDict import logging import mjolnir.dbn import mjolnir.metrics @@ -128,7 +129,7 @@ # Collect features for all known queries. Note that this intentionally # uses query and NOT norm_query_id. Merge those back into the source hits. -fnames_accu = df_hits._sc.accumulator({}, mjolnir.features.FeatureNamesAccumulator()) +fnames_accu = df_hits._sc.accumulator(OrderedDict(), mjolnir.features.FeatureNamesAccumulator()) if ltr_feature_definitions: if brokers: df_features = mjolnir.features.collect_from_ltr_plugin_and_kafka( @@ -171,8 +172,10 @@ # collect the accumulator df_features.cache().count() -# TODO: test that features returned a coherent map (all features must have the same counts) -features = sorted(fnames_accu.value) +if len(set(fnames_accu.value.values())) != 1: +raise ValueError("Not all features were collected properly: " + str(fnames_accu.value)) + +features = fnames_accu.value.keys() df_features = df_features.withColumn('features', mjolnir.spark.add_meta(df_features._sc, F.col('features'), { 'features': features })) diff --git a/mjolnir/features.py b/mjolnir/features.py index 5b2f30f..5cae635 100644 --- a/mjolnir/features.py +++ b/mjolnir/features.py @@ -3,7 +3,7 @@ """ import base64 -from collections import defaultdict, namedtuple +from collections import defaultdict, namedtuple, OrderedDict import json import math import mjolnir.cirrus @@ -176,8 +176,7 @@ def extract_ltr_log_feature_values(response, accum): """Extract feature vector from ltr_log search ext -Scan all hits and inspect the ltr_log field. Feature names are then -lexically ordered to have consistent ordinals +Scan all hits and inspect the ltr_log field. Parameters -- @@ -195,9 +194,11 @@ for hit in response['hits']['hits']: page_id = int(hit['_id']) features = [] -counts = {} -for n, v in sorted(hit['fields']['_ltrlog'][0]['sltr_log'].iteritems()): -features.append(v) +counts = OrderedDict() +for v in hit['fields']['_ltrlog'][0]['sltr_log']: +score = v['value'] +n = v['name'] +features.append(score) counts[n] = counts.get(n, 0) + 1 accum += counts yield page_id, Vectors.dense(features) @@ -343,7 +344,7 @@ Keep a dict with feature names as keys and a counter as values. """ def zero(self, value): -return dict() +return OrderedDict() def addInPlace(self, value1, value2): for k, v in value2.iteritems(): @@ -409,22 +410,18 @@ assert 'responses' in parsed, response.text features = defaultdict(lambda: [float('nan')] * len(feature_names)) -features_seen = {} -# feature ordinals (sorted by feature names) -# ordinal_map[i] returns the feature ordinal for request i and feature_names[i] -ordinal_map = [elts[0] for elts in sorted(enumerate(feature_names), key=lambda i:i[1])] -ordinal_map = [elts[0] for elts in sorted(enumerate(ordinal_map), key=lambda i:i[1])] +features_seen = OrderedDict() for i, response in enumerate(parsed['responses']): -ordinal = ordinal_map[i] # These should be retried when making the queries. If we get this far then # no retry is possible anymore, and the default NaN value will signal to # throw away the hit_page_id if response['status'] != 200: +features_seen[feature_names[i]] = 0 continue features_seen[feature_names[i]] = 1 for hit in response['hits']['hits']: hit_page_id = int(hit['_id']) -features[hit_page_id][ordinal] = hit['_score'] +features[hit_page_id][i] = hit['_score'] feature_names_accu += features_seen return
[MediaWiki-commits] [Gerrit] search/MjoLniR[master]: Adapt feature logging with new ltr response format
DCausse has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/381228 ) Change subject: Adapt feature logging with new ltr response format .. Adapt feature logging with new ltr response format in https://github.com/o19s/elasticsearch-learning-to-rank/pull/88 the response format was changed from a map to a list. Adjust mjolnir so that it understand this new format. This allows us to keep the same ordinals in mjolnir by using OrderedDict. Change-Id: Ib8c994b0dda7c22dc4a7724a141b94171c9dac85 --- M mjolnir/cli/data_pipeline.py M mjolnir/features.py M mjolnir/test/fixtures/requests/test_features.sqlite3 M mjolnir/test/test_features.py 4 files changed, 22 insertions(+), 21 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/search/MjoLniR refs/changes/28/381228/1 diff --git a/mjolnir/cli/data_pipeline.py b/mjolnir/cli/data_pipeline.py index 28248aa..3d03ff4 100644 --- a/mjolnir/cli/data_pipeline.py +++ b/mjolnir/cli/data_pipeline.py @@ -11,6 +11,7 @@ """ import argparse +from collections import OrderedDict import logging import mjolnir.dbn import mjolnir.metrics @@ -128,7 +129,7 @@ # Collect features for all known queries. Note that this intentionally # uses query and NOT norm_query_id. Merge those back into the source hits. -fnames_accu = df_hits._sc.accumulator({}, mjolnir.features.FeatureNamesAccumulator()) +fnames_accu = df_hits._sc.accumulator(OrderedDict(), mjolnir.features.FeatureNamesAccumulator()) if ltr_feature_definitions: if brokers: df_features = mjolnir.features.collect_from_ltr_plugin_and_kafka( @@ -171,8 +172,10 @@ # collect the accumulator df_features.cache().count() -# TODO: test that features returned a coherent map (all features must have the same counts) -features = sorted(fnames_accu.value) +if len(set(fnames_accu.value.values())) != 1: +raise ValueError("Not all features were collected properly: " + str(fnames_accu.value)) + +features = fnames_accu.value.keys() df_features = df_features.withColumn('features', mjolnir.spark.add_meta(df_features._sc, F.col('features'), { 'features': features })) diff --git a/mjolnir/features.py b/mjolnir/features.py index 5b2f30f..5cae635 100644 --- a/mjolnir/features.py +++ b/mjolnir/features.py @@ -3,7 +3,7 @@ """ import base64 -from collections import defaultdict, namedtuple +from collections import defaultdict, namedtuple, OrderedDict import json import math import mjolnir.cirrus @@ -176,8 +176,7 @@ def extract_ltr_log_feature_values(response, accum): """Extract feature vector from ltr_log search ext -Scan all hits and inspect the ltr_log field. Feature names are then -lexically ordered to have consistent ordinals +Scan all hits and inspect the ltr_log field. Parameters -- @@ -195,9 +194,11 @@ for hit in response['hits']['hits']: page_id = int(hit['_id']) features = [] -counts = {} -for n, v in sorted(hit['fields']['_ltrlog'][0]['sltr_log'].iteritems()): -features.append(v) +counts = OrderedDict() +for v in hit['fields']['_ltrlog'][0]['sltr_log']: +score = v['value'] +n = v['name'] +features.append(score) counts[n] = counts.get(n, 0) + 1 accum += counts yield page_id, Vectors.dense(features) @@ -343,7 +344,7 @@ Keep a dict with feature names as keys and a counter as values. """ def zero(self, value): -return dict() +return OrderedDict() def addInPlace(self, value1, value2): for k, v in value2.iteritems(): @@ -409,22 +410,18 @@ assert 'responses' in parsed, response.text features = defaultdict(lambda: [float('nan')] * len(feature_names)) -features_seen = {} -# feature ordinals (sorted by feature names) -# ordinal_map[i] returns the feature ordinal for request i and feature_names[i] -ordinal_map = [elts[0] for elts in sorted(enumerate(feature_names), key=lambda i:i[1])] -ordinal_map = [elts[0] for elts in sorted(enumerate(ordinal_map), key=lambda i:i[1])] +features_seen = OrderedDict() for i, response in enumerate(parsed['responses']): -ordinal = ordinal_map[i] # These should be retried when making the queries. If we get this far then # no retry is possible anymore, and the default NaN value will signal to # throw away the hit_page_id if response['status'] != 200: +features_seen[feature_names[i]] = 0 continue features_seen[feature_names[i]] = 1 for hit in response['hits']['hits']: hit_page_id = int(hit['_id']) -features[hit_page_id][ordinal] = hit['_score'] +features[hit_page_id][i] = hit['_score'] feature_names_accu += features_seen