EBernhardson has submitted this change and it was merged. (
https://gerrit.wikimedia.org/r/364725 )
Change subject: Extract feature vectors from the ltr plugin
......................................................................
Extract feature vectors from the ltr plugin
Allows to use features defined in the plugin store to record feature
vectors.
Not a big fan of the way I integrated this feature, it certainly needs
some cleanups to reuse a bit more code.
Also I thought that I won't use _msearch but I found that it allowed
me to reuse a bit more code...
Bug: T168813
Change-Id: Ic258f6bee8bcb0cee3ab802935fc6ab404c85c25
---
M mjolnir/cli/data_pipeline.py
M mjolnir/features.py
M mjolnir/test/fixtures/requests/test_features.sqlite3
M mjolnir/test/test_features.py
4 files changed, 378 insertions(+), 20 deletions(-)
Approvals:
EBernhardson: Verified; Looks good to me, approved
diff --git a/mjolnir/cli/data_pipeline.py b/mjolnir/cli/data_pipeline.py
index 81b82a6..28248aa 100644
--- a/mjolnir/cli/data_pipeline.py
+++ b/mjolnir/cli/data_pipeline.py
@@ -28,7 +28,7 @@
def main(sc, sqlContext, input_dir, output_dir, wikis, samples_per_wiki,
- min_sessions_per_query, search_cluster, brokers):
+ min_sessions_per_query, search_cluster, brokers,
ltr_feature_definitions):
# TODO: Should this jar have to be provided on the command line instead?
sqlContext.sql("ADD JAR
/mnt/hdfs/wmf/refinery/current/artifacts/refinery-hive.jar")
sqlContext.sql("CREATE TEMPORARY FUNCTION stemmer AS
'org.wikimedia.analytics.refinery.hive.StemmerUDF'")
@@ -129,26 +129,44 @@
# Collect features for all known queries. Note that this intentionally
# uses query and NOT norm_query_id. Merge those back into the source hits.
fnames_accu = df_hits._sc.accumulator({},
mjolnir.features.FeatureNamesAccumulator())
- if brokers:
- df_features = mjolnir.features.collect_kafka(
- df_hits,
- brokers=brokers,
- indices={wiki: '%s_content' % (wiki) for wiki in wikis},
- feature_definitions=mjolnir.features.enwiki_features(),
- feature_names_accu=fnames_accu)
+ if ltr_feature_definitions:
+ if brokers:
+ df_features = mjolnir.features.collect_from_ltr_plugin_and_kafka(
+ df_hits,
+ brokers=brokers,
+ indices={wiki: '%s_content' % (wiki) for wiki in wikis},
+ model=ltr_feature_definitions,
+ feature_names_accu=fnames_accu)
+ else:
+ df_features = mjolnir.features.collect_from_ltr_plugin(
+ df_hits,
+ url_list=SEARCH_CLUSTERS[search_cluster],
+ # TODO: While this works for now, at some point we might want
to handle
+ # things like multimedia search from commons, and non-main
namespace searches.
+ indices={wiki: '%s_content' % (wiki) for wiki in wikis},
+ model=ltr_feature_definitions,
+ feature_names_accu=fnames_accu)
else:
- df_features = mjolnir.features.collect_es(
- df_hits,
- url_list=SEARCH_CLUSTERS[search_cluster],
- # TODO: While this works for now, at some point we might want to
handle
- # things like multimedia search from commons, and non-main
namespace searches.
- indices={wiki: '%s_content' % (wiki) for wiki in wikis},
- # TODO: If we are going to do multiple wikis, this probably needs
different features
- # per wiki? At a minimum trying to include useful templates as
features will need
- # to vary per-wiki. Varied features per wiki would also mean they
can't be trained
- # together, which is perhaps a good thing anyways.
- feature_definitions=mjolnir.features.enwiki_features(),
- feature_names_accu=fnames_accu)
+ if brokers:
+ df_features = mjolnir.features.collect_kafka(
+ df_hits,
+ brokers=brokers,
+ indices={wiki: '%s_content' % (wiki) for wiki in wikis},
+ feature_definitions=mjolnir.features.enwiki_features(),
+ feature_names_accu=fnames_accu)
+ else:
+ df_features = mjolnir.features.collect_es(
+ df_hits,
+ url_list=SEARCH_CLUSTERS[search_cluster],
+ # TODO: While this works for now, at some point we might want
to handle
+ # things like multimedia search from commons, and non-main
namespace searches.
+ indices={wiki: '%s_content' % (wiki) for wiki in wikis},
+ # TODO: If we are going to do multiple wikis, this probably
needs different features
+ # per wiki? At a minimum trying to include useful templates as
features will need
+ # to vary per-wiki. Varied features per wiki would also mean
they can't be trained
+ # together, which is perhaps a good thing anyways.
+ feature_definitions=mjolnir.features.enwiki_features(),
+ feature_names_accu=fnames_accu)
# collect the accumulator
df_features.cache().count()
@@ -199,6 +217,10 @@
'-vv', '--very-verbose', dest='very_verbose', default=False,
action='store_true',
help='Increase logging to DEBUG')
parser.add_argument(
+ '-f', '--feature-definitions', dest='ltr_feature_definitions',
type=str, required=False,
+ help='Name of the LTR plugin feature definitions
(featureset:name[@store] or '
+ + 'model:name[@store])')
+ parser.add_argument(
'wikis', metavar='wiki', type=str, nargs='+',
help='A wiki to generate features and labels for')
diff --git a/mjolnir/features.py b/mjolnir/features.py
index a73f368..60265fd 100644
--- a/mjolnir/features.py
+++ b/mjolnir/features.py
@@ -14,6 +14,7 @@
from pyspark.ml.linalg import Vectors
from pyspark.sql import functions as F
import random
+import re
import requests
@@ -51,6 +52,155 @@
}
}
})
+
+
+class LtrLoggingQuery(object):
+ """Model or featureset stored in a featurestore
+
+ ...
+
+ Methods
+ -------
+ make_query(query)
+ Build the elasticsearch query
+ """
+
+ def __init__(self, elementType, name, store=None,
queryParam='query_string'):
+ """Prepare a sltr query (requires the ltr plugin)
+
+ Parameters
+ ----------
+ name : string
+ Name of the model or featureset to log
+ elementType: string
+ Either set or model
+ queryParam:
+ name of the param to pass the query string
+ store:
+ name of the feature store
+ """
+ self.name = name
+ self.store = store
+ self.elementType = elementType
+ self.queryParam = queryParam
+
+ def make_search(self, ids, query):
+ """Build the elasticsearch query
+
+ Parameters
+ ----------
+ ids : list of ints
+ Set of page ids to collect features for
+ query : string
+ User provided query term (unused)
+
+ Returns
+ -------
+ An elasticsearch search request object
+ """
+
+ query = {
+ "_source": False,
+ "from": 0,
+ "size": 9999,
+ "query": {
+ "bool": {
+ "filter": [
+ # wrap inside the filter so we bypass score
computation during the query phase
+ # feature scores will be computed only once during the
fetch phase
+ {
+ "sltr": {
+ "_name": "sltr_log",
+ "params": {
+ self.queryParam: query
+ },
+ self.elementType: self.name
+ }
+ },
+ {
+ 'ids': {
+ 'values': map(str, set(ids))
+ }
+ }
+ ]
+ }
+ },
+ "ext": {
+ "ltr_log": {
+ "log_specs": [
+ {
+ "named_query": "sltr_log",
+ "missing_as_zero": True,
+ }
+ ]
+ }
+ }
+ }
+ return query
+
+ def make_msearch(self, row, indices):
+ """Build the elasticsearch query
+
+ Parameters
+ ----------
+ query : string
+ User provided query term (unused)
+ row : pyspark.sql.Row
+ Row containing wikiid, query and hit_page_ids columns
+ indices : dict
+ Map from wikiid to elasticsearch index to query
+
+ Returns
+ -------
+ string
+ An elasticsearch msearch request encoded as a json string
+ """
+
+ # some duplicated code for creating a single search, this is mostly to
+ # reuse msearch request handling in cirrus.py. This possibly could be
+ # used to send multiple queries once.
+ bulk_query = []
+ if row.wikiid in indices:
+ index = indices[row.wikiid]
+ else:
+ # Takes advantage of aliases for the wikiid typically used by
+ # CirrusSearch
+ index = row.wikiid
+ bulk_query.append('{"index": "%s"}' % (index))
+ bulk_query.append(json.dumps(self.make_search(row.hit_page_ids,
row.query)))
+
+ # elasticsearch bulk format requires each item to be on a line and the
+ # request to finish with a \n
+ return "%s\n" % ('\n'.join(bulk_query))
+
+
+def extract_ltr_log_feature_values(response, accum):
+ """Extract feature vector from ltr_log search ext
+ Scan all hits and inspect the ltr_log field. Feature names are then
+ lexically ordered to have consistent ordinals
+
+ Parameters
+ ----------
+ response : dict
+ elasticsearch search response
+ accum : Accumulator
+ spark accumulator to collect feature names
+
+ Yields
+ ------
+ pyspark.sql.Row
+ Collected feature vectors for a single (wiki, query, hit_page_id)
+ """
+
+ for hit in response['hits']['hits']:
+ page_id = int(hit['_id'])
+ features = []
+ counts = {}
+ for n, v in
sorted(hit['fields']['_ltrlog'][0]['sltr_log'].iteritems()):
+ features.append(v)
+ counts[n] = counts.get(n, 0) + 1
+ accum += counts
+ yield page_id, Vectors.dense(features)
class ScriptFeature(object):
@@ -446,3 +596,153 @@
df_agg
.rdd.mapPartitions(collect_partition)
.toDF(['wikiid', 'query', 'hit_page_id', 'features']))
+
+
+def _explode_ltr_model_definition(definition):
+ """
+ Parse a string describing a ltr featureset or model
+ (featureset|model):name[@storename]
+
+ Parameters
+ ----------
+ definition: string
+ the model/featureset definition
+
+ Returns
+ -------
+ list: 3 elements list: type, name, store
+ """
+ res = re.search('(featureset|model)+:([^@]+)(?:[@](.+))?$', definition)
+ if res is None:
+ raise ValueError("Cannot parse ltr model definition [%s]." %
(definition))
+ return res.groups()
+
+
+def collect_from_ltr_plugin(df, url_list, model, feature_names_accu,
indices=None, session_factory=requests.Session):
+ """Collect feature vectors from elasticsearch and the ltr plugin
+
+ Performs queries against a remote elasticsearch instance to collect feature
+ vectors using the features defined in a named featureset or model for all
+ (query, hit_page_id) combinations in df.
+
+ Parameters
+ ----------
+ df : pyspark.sql.DataFrame
+ Source dataframe containing wikiid, query and hit_page_id fields
+ to collect feature vectors for.
+ url_list : list of str
+ List of URLs to send multi-search requests to. One will be chosen at
+ random per partition.
+ model : string
+ definition of the model/featureset: "featureset:name", "model:name" or
"featureset:name@storeName"
+ indices : dict, optional
+ map from wikiid to elasticsearch index to query. If wikiid is
+ not present the wikiid will be used as index name. (Default: None)
+ session_factory : callable, optional
+ Used to create new sessions. (Default: requests.Session)
+
+ Returns
+ -------
+ pyspark.sql.DataFrame
+ Collected feature vectors with one row per unique (wikiid, query,
hit_page_id). All
+ feature columns are prefixed with feature_.
+ """
+
+ mjolnir.spark.assert_columns(df, ['wikiid', 'query', 'hit_page_id'])
+
+ if indices is None:
+ indices = {}
+
+ eltType, name, store = _explode_ltr_model_definition(model)
+
+ def collect_partition(rows):
+ """Generate a function that will collect feature vectors for each
partition.
+
+ Yields
+ -------
+ pyspark.sql.Row
+ Collected feature vectors for a single (wiki, query, hit_page_id)
+ """
+ random.shuffle(url_list)
+ url = url_list.pop()
+ log_query = LtrLoggingQuery(eltType, name, store)
+ with session_factory() as session:
+ for row in rows:
+ req = log_query.make_msearch(row, indices)
+ url, response = mjolnir.cirrus.make_request(session, url,
url_list, req)
+ parsed = json.loads(response.text)
+ assert 'responses' in parsed, response.text
+ resp = parsed['responses'][0]
+
+ for hit_page_id, features in
extract_ltr_log_feature_values(resp, feature_names_accu):
+ yield [row['wikiid'], row['query'], hit_page_id, features]
+
+ return (
+ df
+ .groupBy('wikiid', 'query')
+ .agg(F.collect_set('hit_page_id').alias('hit_page_ids'))
+ .rdd.mapPartitions(collect_partition)
+ .toDF(['wikiid', 'query', 'hit_page_id', 'features']))
+
+
+def collect_from_ltr_plugin_and_kafka(df, brokers, model, feature_names_accu,
indices=None):
+ """Collect feature vectors from elasticsearch via kafka
+
+ Pushes queries into a kafka topic and retrieves results from a second
kafka topic.
+ A daemon must be running on relforge to collect the queries and produce
results.
+
+ Parameters
+ ----------
+ df : pyspark.sql.DataFrame
+ Source dataframe containing wikiid, query and hit_page_id fields
+ to collect feature vectors for.
+ brokers : list of str
+ List of kafka brokers used to bootstrap access into the kafka cluster.
+ model : string
+ definition of the model/featureset: "featureset:name", "model:name" or
"featureset:name@storeName"
+ feature_names_accu : Accumulator
+ used to collect feature names
+ indices : dict, optional
+ map from wikiid to elasticsearch index to query. If wikiid is
+ not present the wikiid will be used as index name. (Default: None)
+ """
+ mjolnir.spark.assert_columns(df, ['wikiid', 'query', 'hit_page_id'])
+ if indices is None:
+ indices = {}
+ eltType, name, store = _explode_ltr_model_definition(model)
+ log_query = LtrLoggingQuery(eltType, name, store)
+ Response = namedtuple('Response', ['status_code', 'text'])
+
+ def kafka_handle_response(record):
+ response = Response(record['status_code'], record['text'])
+ parsed = json.loads(response.text)
+ assert 'responses' in parsed, response.text
+ response = parsed['responses'][0]
+
+ for hit_page_id, features in extract_ltr_log_feature_values(response,
feature_names_accu):
+ yield [record['wikiid'], record['query'], hit_page_id, features]
+
+ run_id = base64.b64encode(os.urandom(16))
+ offsets_start = mjolnir.kafka.client.get_offset_start(brokers)
+ print 'producing queries to kafka'
+ num_end_sigils = mjolnir.kafka.client.produce_queries(
+ df.groupBy('wikiid',
'query').agg(F.collect_set('hit_page_id').alias('hit_page_ids')),
+ brokers,
+ run_id,
+ lambda row: log_query.make_msearch(row, indices))
+ print 'waiting for end run sigils'
+ offsets_end = mjolnir.kafka.client.get_offset_end(brokers, run_id,
num_end_sigils)
+ print 'reading results from:'
+ for p, (start, end) in enumerate(zip(offsets_start, offsets_end)):
+ print '%d : %d to %d' % (p, start, end)
+ return (
+ mjolnir.kafka.client.collect_results(
+ df._sc,
+ brokers,
+ kafka_handle_response,
+ offsets_start,
+ offsets_end,
+ run_id)
+ .toDF(['wikiid', 'query', 'hit_page_id', 'features'])
+ # We could have gotten duplicate data from kafka. Clean them up.
+ .drop_duplicates(['wikiid', 'query', 'hit_page_id']))
diff --git a/mjolnir/test/fixtures/requests/test_features.sqlite3
b/mjolnir/test/fixtures/requests/test_features.sqlite3
index e5f81c2..944ae3f 100644
--- a/mjolnir/test/fixtures/requests/test_features.sqlite3
+++ b/mjolnir/test/fixtures/requests/test_features.sqlite3
Binary files differ
diff --git a/mjolnir/test/test_features.py b/mjolnir/test/test_features.py
index 62ea7fe..4b30770 100644
--- a/mjolnir/test/test_features.py
+++ b/mjolnir/test/test_features.py
@@ -34,3 +34,39 @@
apple_856 = dict(zip(feature_names, apple_856_features.toArray()))
assert apple_856['title'] == 36.30035
assert apple_856['file_text'] == 0.0
+
+
+def test_collect_ltr_plugin(spark_context, hive_context,
make_requests_session):
+ def session_factory():
+ return make_requests_session('requests/test_features.sqlite3')
+
+ r = pyspark.sql.Row('wikiid', 'query', 'hit_page_id')
+ source_data = {
+ 'apple': [18978754, 36071326, 856],
+ 'foo': [11178, 1140775, 844613]
+ }
+ rows = [r('enwiki', query, page_id) for query, ids in source_data.items()
for page_id in ids]
+ df = spark_context.parallelize(rows).toDF()
+
+ accu = df._sc.accumulator({}, mjolnir.features.FeatureNamesAccumulator())
+ df_result = mjolnir.features.collect_from_ltr_plugin(df,
['http://localhost:9200'],
+
"model:enwiki_100t_v1",
+ accu,
+ {'enwiki':
'en-wp-ltr-0617_content_first'},
+
session_factory=session_factory)
+
+ result = df_result.collect()
+
+ # all features must have been logged
+ assert len(set(accu.value.values())) == 1
+ feature_names = sorted(accu.value)
+
+ expected_page_ids = set([row.hit_page_id for row in rows])
+ result_page_ids = set([row.hit_page_id for row in result])
+ assert expected_page_ids == result_page_ids
+
+ apple_856_features = [row.features for row in result if row.query ==
'apple' and row.hit_page_id == 856][0]
+ apple_856 = dict(zip(feature_names, apple_856_features.toArray()))
+ assert apple_856['title'] == 35.9801
+ assert apple_856['auxiliary_text'] == 45.17453
+ assert apple_856['file_text'] == 0.0
--
To view, visit https://gerrit.wikimedia.org/r/364725
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: Ic258f6bee8bcb0cee3ab802935fc6ab404c85c25
Gerrit-PatchSet: 7
Gerrit-Project: search/MjoLniR
Gerrit-Branch: master
Gerrit-Owner: DCausse <[email protected]>
Gerrit-Reviewer: DCausse <[email protected]>
Gerrit-Reviewer: EBernhardson <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits