EBernhardson has submitted this change and it was merged. (
https://gerrit.wikimedia.org/r/364433 )
Change subject: Use a custom Accumulator to collect feature names
......................................................................
Use a custom Accumulator to collect feature names
and use lexical order for feature ordinals.
This a preliminary patch to allow mjolnir to work on an unknown
feature set.
Currently mjolnir owns the list of features and knows how to map
their names with an ordinal in the feature vector used for training.
Use a custom accumulator so that feature names can be discovered while
logging them. The strategy here is to always record feature values
in alphabetical order.
Use nolock as a nfs mount option (it solved by lock issue with sqlite3)
Bug: T168813
Change-Id: I1758166c9cf746a27626216b6018b476b3a841df
---
M Vagrantfile
M mjolnir/cli/data_pipeline.py
M mjolnir/features.py
M mjolnir/test/test_features.py
4 files changed, 70 insertions(+), 21 deletions(-)
Approvals:
EBernhardson: Verified; Looks good to me, approved
diff --git a/Vagrantfile b/Vagrantfile
index ea4ae99..d8d71ad 100644
--- a/Vagrantfile
+++ b/Vagrantfile
@@ -7,7 +7,7 @@
root_share_options = { id: 'vagrant-root' }
root_share_options[:type] = :nfs
- root_share_options[:mount_options] = ['noatime', 'rsize=32767',
'wsize=3267', 'async']
+ root_share_options[:mount_options] = ['noatime', 'rsize=32767',
'wsize=3267', 'async', 'nolock']
config.nfs.map_uid = Process.uid
config.nfs.map_gid = Process.gid
config.vm.synced_folder ".", "/vagrant", root_share_options
diff --git a/mjolnir/cli/data_pipeline.py b/mjolnir/cli/data_pipeline.py
index e79db75..a56be8d 100644
--- a/mjolnir/cli/data_pipeline.py
+++ b/mjolnir/cli/data_pipeline.py
@@ -129,12 +129,14 @@
# Collect features for all known queries. Note that this intentionally
# uses query and NOT norm_query_id. Merge those back into the source hits.
+ fnames_accu = df_hits._sc.accumulator({},
mjolnir.features.FeatureNamesAccumulator())
if brokers:
df_features = mjolnir.features.collect_kafka(
df_hits,
brokers=brokers,
indices={wiki: '%s_content' % (wiki) for wiki in wikis},
- feature_definitions=mjolnir.features.enwiki_features())
+ feature_definitions=mjolnir.features.enwiki_features(),
+ feature_names_accu=fnames_accu)
else:
df_features = mjolnir.features.collect_es(
df_hits,
@@ -146,7 +148,17 @@
# per wiki? At a minimum trying to include useful templates as
features will need
# to vary per-wiki. Varied features per wiki would also mean they
can't be trained
# together, which is perhaps a good thing anyways.
- feature_definitions=mjolnir.features.enwiki_features())
+ feature_definitions=mjolnir.features.enwiki_features(),
+ feature_names_accu=fnames_accu)
+
+ # collect the accumulator
+ df_features.cache().count()
+
+ # TODO: test that features returned a coherent map (all features must have
the same counts)
+ features = sorted(fnames_accu.value)
+ df_features = df_features.withColumn('features',
mjolnir.spark.add_meta(df_features._sc, F.col('features'), {
+ 'features': features
+ }))
df_hits_with_features = (
df_hits
.join(df_features, how='inner', on=['wikiid', 'query', 'hit_page_id'])
diff --git a/mjolnir/features.py b/mjolnir/features.py
index 97b804e..8262ade 100644
--- a/mjolnir/features.py
+++ b/mjolnir/features.py
@@ -10,6 +10,7 @@
import mjolnir.kafka.client
import mjolnir.spark
import os
+from pyspark.accumulators import AccumulatorParam
from pyspark.ml.linalg import Vectors
from pyspark.sql import functions as F
import random
@@ -185,6 +186,21 @@
}
+class FeatureNamesAccumulator(AccumulatorParam):
+ """
+ Spark accumulator to keep track of the feature names used
+ when retrieving feature vectors.
+ Keep a dict with feature names as keys and a counter as values.
+ """
+ def zero(self, value):
+ return dict()
+
+ def addInPlace(self, value1, value2):
+ for k, v in value2.iteritems():
+ value1[k] = value1.get(k, 0) + v
+ return value1
+
+
def _create_bulk_query(row, indices, feature_definitions):
"""Create an elasticsearch bulk query for provided row.
@@ -219,8 +235,17 @@
return "%s\n" % ('\n'.join(bulk_query))
-def _handle_response(response, num_features):
+def _handle_response(response, feature_names, feature_names_accu):
"""Parse an elasticsearch response from requests into a feature vector.
+
+ Parameters
+ ----------
+ response : dict
+ msearch responses
+ feature_names : list
+ feature names
+ feature_names_accu : Accumulator
+ count logged features
Returns
------
@@ -233,16 +258,24 @@
parsed = json.loads(response.text)
assert 'responses' in parsed, response.text
- features = defaultdict(lambda: [float('nan')] * num_features)
+ features = defaultdict(lambda: [float('nan')] * len(feature_names))
+ features_seen = {}
+ # feature ordinals (sorted by feature names)
+ # ordinal_map[i] returns the feature ordinal for request i and
feature_names[i]
+ ordinal_map = [elts[0] for elts in sorted(enumerate(feature_names),
key=lambda i:i[1])]
+ ordinal_map = [elts[0] for elts in sorted(enumerate(ordinal_map),
key=lambda i:i[1])]
for i, response in enumerate(parsed['responses']):
+ ordinal = ordinal_map[i]
# These should be retried when making the queries. If we get this far
then
# no retry is possible anymore, and the default NaN value will signal
to
# throw away the hit_page_id
if response['status'] != 200:
continue
+ features_seen[feature_names[i]] = 1
for hit in response['hits']['hits']:
hit_page_id = int(hit['_id'])
- features[hit_page_id][i] = hit['_score']
+ features[hit_page_id][ordinal] = hit['_score']
+ feature_names_accu += features_seen
return features.items()
@@ -277,7 +310,7 @@
]
-def collect_kafka(df, brokers, feature_definitions, indices=None):
+def collect_kafka(df, brokers, feature_definitions, feature_names_accu,
indices=None):
"""Collect feature vectors from elasticsearch via kafka
Pushes queries into a kafka topic and retrieves results from a second
kafka topic.
@@ -292,6 +325,8 @@
List of kafka brokers used to bootstrap access into the kafka cluster.
feature_definitions : list
list of feature objects defining the features to collect.
+ feature_names_accu : Accumulator
+ used to collect feature names
indices : dict, optional
map from wikiid to elasticsearch index to query. If wikiid is
not present the wikiid will be used as index name. (Default: None)
@@ -299,12 +334,12 @@
mjolnir.spark.assert_columns(df, ['wikiid', 'query', 'hit_page_id'])
if indices is None:
indices = {}
- num_features = len(feature_definitions)
+ feature_names = [f.name for f in feature_definitions]
Response = namedtuple('Response', ['status_code', 'text'])
def kafka_handle_response(record):
response = Response(record['status_code'], record['text'])
- for hit_page_id, features in _handle_response(response, num_features):
+ for hit_page_id, features in _handle_response(response, feature_names,
feature_names_accu):
# nan features mean some sort of failure, drop the row.
# TODO: Add some accumulator to count and report dropped rows?
if not any(map(math.isnan, features)):
@@ -333,13 +368,10 @@
run_id)
.toDF(['wikiid', 'query', 'hit_page_id', 'features'])
# We could have gotten duplicate data from kafka. Clean them up.
- .drop_duplicates(['wikiid', 'query', 'hit_page_id'])
- .withColumn('features', mjolnir.spark.add_meta(df._sc,
F.col('features'), {
- 'features': [f.name for f in feature_definitions]
- })))
+ .drop_duplicates(['wikiid', 'query', 'hit_page_id']))
-def collect_es(df, url_list, feature_definitions, indices=None,
session_factory=requests.Session):
+def collect_es(df, url_list, feature_definitions, feature_names_accu,
indices=None, session_factory=requests.Session):
"""Collect feature vectors from elasticsearch
Performs queries against a remote elasticsearch instance to collect feature
@@ -355,6 +387,7 @@
random per partition.
feature_definitions : list
list of feature objects defining the features to collect.
+ feature_names_accu : Accumulator used to track feature names and ordinals
indices : dict, optional
map from wikiid to elasticsearch index to query. If wikiid is
not present the wikiid will be used as index name. (Default: None)
@@ -373,6 +406,8 @@
if indices is None:
indices = {}
+ feature_names = [f.name for f in feature_definitions]
+
def collect_partition(rows):
"""Generate a function that will collect feature vectors for each
partition.
@@ -381,14 +416,13 @@
pyspark.sql.Row
Collected feature vectors for a single (wiki, query, hit_page_id)
"""
- num_features = len(feature_definitions)
random.shuffle(url_list)
url = url_list.pop()
with session_factory() as session:
for row in rows:
bulk_query = _create_bulk_query(row, indices,
feature_definitions)
url, response = mjolnir.cirrus.make_request(session, url,
url_list, bulk_query)
- for hit_page_id, features in _handle_response(response,
num_features):
+ for hit_page_id, features in _handle_response(response,
feature_names, feature_names_accu):
# nan features mean some sort of failure, drop the row.
# TODO: Add some accumulator to count and report dropped
rows?
if not any(map(math.isnan, features)):
@@ -399,7 +433,4 @@
.groupBy('wikiid', 'query')
.agg(F.collect_set('hit_page_id').alias('hit_page_ids'))
.rdd.mapPartitions(collect_partition)
- .toDF(['wikiid', 'query', 'hit_page_id', 'features'])
- .withColumn('features', mjolnir.spark.add_meta(df._sc,
F.col('features'), {
- 'features': [f.name for f in feature_definitions]
- })))
+ .toDF(['wikiid', 'query', 'hit_page_id', 'features']))
diff --git a/mjolnir/test/test_features.py b/mjolnir/test/test_features.py
index caf621e..cabaae7 100644
--- a/mjolnir/test/test_features.py
+++ b/mjolnir/test/test_features.py
@@ -14,12 +14,18 @@
rows = [r('enwiki', query, page_id) for query, ids in source_data.items()
for page_id in ids]
df = spark_context.parallelize(rows).toDF()
+ accu = df._sc.accumulator({}, mjolnir.features.FeatureNamesAccumulator())
df_result = mjolnir.features.collect_es(df,
['http://localhost:9200/_msearch'],
mjolnir.features.enwiki_features(),
+ accu,
{'enwiki': 'enwiki_content'},
session_factory=session_factory)
result = df_result.collect()
- feature_names = df_result.schema['features'].metadata['features']
+
+ # all features must have been logged
+ assert len(set(accu.value.values())) == 1
+ feature_names = sorted(accu.value)
+
expected_page_ids = set([row.hit_page_id for row in rows])
result_page_ids = set([row.hit_page_id for row in result])
assert expected_page_ids == result_page_ids
--
To view, visit https://gerrit.wikimedia.org/r/364433
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I1758166c9cf746a27626216b6018b476b3a841df
Gerrit-PatchSet: 3
Gerrit-Project: search/MjoLniR
Gerrit-Branch: master
Gerrit-Owner: DCausse <[email protected]>
Gerrit-Reviewer: EBernhardson <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits