EBernhardson has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/364725 )

Change subject: Extract feature vectors from the ltr plugin
......................................................................


Extract feature vectors from the ltr plugin

Allows to use features defined in the plugin store to record feature
vectors.
Not a big fan of the way I integrated this feature, it certainly needs
some cleanups to reuse a bit more code.
Also I thought that I won't use _msearch but I found that it allowed
me to reuse a bit more code...

Bug: T168813
Change-Id: Ic258f6bee8bcb0cee3ab802935fc6ab404c85c25
---
M mjolnir/cli/data_pipeline.py
M mjolnir/features.py
M mjolnir/test/fixtures/requests/test_features.sqlite3
M mjolnir/test/test_features.py
4 files changed, 378 insertions(+), 20 deletions(-)

Approvals:
  EBernhardson: Verified; Looks good to me, approved



diff --git a/mjolnir/cli/data_pipeline.py b/mjolnir/cli/data_pipeline.py
index 81b82a6..28248aa 100644
--- a/mjolnir/cli/data_pipeline.py
+++ b/mjolnir/cli/data_pipeline.py
@@ -28,7 +28,7 @@
 
 
 def main(sc, sqlContext, input_dir, output_dir, wikis, samples_per_wiki,
-         min_sessions_per_query, search_cluster, brokers):
+         min_sessions_per_query, search_cluster, brokers, 
ltr_feature_definitions):
     # TODO: Should this jar have to be provided on the command line instead?
     sqlContext.sql("ADD JAR 
/mnt/hdfs/wmf/refinery/current/artifacts/refinery-hive.jar")
     sqlContext.sql("CREATE TEMPORARY FUNCTION stemmer AS 
'org.wikimedia.analytics.refinery.hive.StemmerUDF'")
@@ -129,26 +129,44 @@
     # Collect features for all known queries. Note that this intentionally
     # uses query and NOT norm_query_id. Merge those back into the source hits.
     fnames_accu = df_hits._sc.accumulator({}, 
mjolnir.features.FeatureNamesAccumulator())
-    if brokers:
-        df_features = mjolnir.features.collect_kafka(
-            df_hits,
-            brokers=brokers,
-            indices={wiki: '%s_content' % (wiki) for wiki in wikis},
-            feature_definitions=mjolnir.features.enwiki_features(),
-            feature_names_accu=fnames_accu)
+    if ltr_feature_definitions:
+        if brokers:
+            df_features = mjolnir.features.collect_from_ltr_plugin_and_kafka(
+                df_hits,
+                brokers=brokers,
+                indices={wiki: '%s_content' % (wiki) for wiki in wikis},
+                model=ltr_feature_definitions,
+                feature_names_accu=fnames_accu)
+        else:
+            df_features = mjolnir.features.collect_from_ltr_plugin(
+                df_hits,
+                url_list=SEARCH_CLUSTERS[search_cluster],
+                # TODO: While this works for now, at some point we might want 
to handle
+                # things like multimedia search from commons, and non-main 
namespace searches.
+                indices={wiki: '%s_content' % (wiki) for wiki in wikis},
+                model=ltr_feature_definitions,
+                feature_names_accu=fnames_accu)
     else:
-        df_features = mjolnir.features.collect_es(
-            df_hits,
-            url_list=SEARCH_CLUSTERS[search_cluster],
-            # TODO: While this works for now, at some point we might want to 
handle
-            # things like multimedia search from commons, and non-main 
namespace searches.
-            indices={wiki: '%s_content' % (wiki) for wiki in wikis},
-            # TODO: If we are going to do multiple wikis, this probably needs 
different features
-            # per wiki? At a minimum trying to include useful templates as 
features will need
-            # to vary per-wiki. Varied features per wiki would also mean they 
can't be trained
-            # together, which is perhaps a good thing anyways.
-            feature_definitions=mjolnir.features.enwiki_features(),
-            feature_names_accu=fnames_accu)
+        if brokers:
+            df_features = mjolnir.features.collect_kafka(
+                df_hits,
+                brokers=brokers,
+                indices={wiki: '%s_content' % (wiki) for wiki in wikis},
+                feature_definitions=mjolnir.features.enwiki_features(),
+                feature_names_accu=fnames_accu)
+        else:
+            df_features = mjolnir.features.collect_es(
+                df_hits,
+                url_list=SEARCH_CLUSTERS[search_cluster],
+                # TODO: While this works for now, at some point we might want 
to handle
+                # things like multimedia search from commons, and non-main 
namespace searches.
+                indices={wiki: '%s_content' % (wiki) for wiki in wikis},
+                # TODO: If we are going to do multiple wikis, this probably 
needs different features
+                # per wiki? At a minimum trying to include useful templates as 
features will need
+                # to vary per-wiki. Varied features per wiki would also mean 
they can't be trained
+                # together, which is perhaps a good thing anyways.
+                feature_definitions=mjolnir.features.enwiki_features(),
+                feature_names_accu=fnames_accu)
 
     # collect the accumulator
     df_features.cache().count()
@@ -199,6 +217,10 @@
         '-vv', '--very-verbose', dest='very_verbose', default=False, 
action='store_true',
         help='Increase logging to DEBUG')
     parser.add_argument(
+        '-f', '--feature-definitions', dest='ltr_feature_definitions', 
type=str, required=False,
+        help='Name of the LTR plugin feature definitions 
(featureset:name[@store] or '
+             + 'model:name[@store])')
+    parser.add_argument(
         'wikis', metavar='wiki', type=str, nargs='+',
         help='A wiki to generate features and labels for')
 
diff --git a/mjolnir/features.py b/mjolnir/features.py
index a73f368..60265fd 100644
--- a/mjolnir/features.py
+++ b/mjolnir/features.py
@@ -14,6 +14,7 @@
 from pyspark.ml.linalg import Vectors
 from pyspark.sql import functions as F
 import random
+import re
 import requests
 
 
@@ -51,6 +52,155 @@
             }
         }
     })
+
+
+class LtrLoggingQuery(object):
+    """Model or featureset stored in a featurestore
+
+    ...
+
+    Methods
+    -------
+    make_query(query)
+        Build the elasticsearch query
+    """
+
+    def __init__(self, elementType, name, store=None, 
queryParam='query_string'):
+        """Prepare a sltr query (requires the ltr plugin)
+
+        Parameters
+        ----------
+        name : string
+            Name of the model or featureset to log
+        elementType: string
+            Either set or model
+        queryParam:
+            name of the param to pass the query string
+        store:
+            name of the feature store
+        """
+        self.name = name
+        self.store = store
+        self.elementType = elementType
+        self.queryParam = queryParam
+
+    def make_search(self, ids, query):
+        """Build the elasticsearch query
+
+        Parameters
+        ----------
+        ids : list of ints
+            Set of page ids to collect features for
+        query : string
+            User provided query term (unused)
+
+        Returns
+        -------
+        An elasticsearch search request object
+        """
+
+        query = {
+            "_source": False,
+            "from": 0,
+            "size": 9999,
+            "query": {
+                "bool": {
+                    "filter": [
+                        # wrap inside the filter so we bypass score 
computation during the query phase
+                        # feature scores will be computed only once during the 
fetch phase
+                        {
+                            "sltr": {
+                                "_name": "sltr_log",
+                                "params": {
+                                    self.queryParam: query
+                                },
+                                self.elementType: self.name
+                            }
+                        },
+                        {
+                            'ids': {
+                                'values': map(str, set(ids))
+                            }
+                        }
+                    ]
+                }
+            },
+            "ext": {
+                "ltr_log": {
+                    "log_specs": [
+                        {
+                            "named_query": "sltr_log",
+                            "missing_as_zero": True,
+                        }
+                    ]
+                }
+            }
+        }
+        return query
+
+    def make_msearch(self, row, indices):
+        """Build the elasticsearch query
+
+        Parameters
+        ----------
+        query : string
+            User provided query term (unused)
+        row : pyspark.sql.Row
+            Row containing wikiid, query and hit_page_ids columns
+        indices : dict
+            Map from wikiid to elasticsearch index to query
+
+        Returns
+        -------
+        string
+            An elasticsearch msearch request encoded as a json string
+        """
+
+        # some duplicated code for creating a single search, this is mostly to
+        # reuse msearch request handling in cirrus.py. This possibly could be
+        # used to send multiple queries once.
+        bulk_query = []
+        if row.wikiid in indices:
+            index = indices[row.wikiid]
+        else:
+            # Takes advantage of aliases for the wikiid typically used by
+            # CirrusSearch
+            index = row.wikiid
+        bulk_query.append('{"index": "%s"}' % (index))
+        bulk_query.append(json.dumps(self.make_search(row.hit_page_ids, 
row.query)))
+
+        # elasticsearch bulk format requires each item to be on a line and the
+        # request to finish with a \n
+        return "%s\n" % ('\n'.join(bulk_query))
+
+
+def extract_ltr_log_feature_values(response, accum):
+    """Extract feature vector from ltr_log search ext
+    Scan all hits and inspect the ltr_log field. Feature names are then
+    lexically ordered to have consistent ordinals
+
+    Parameters
+    ----------
+    response : dict
+        elasticsearch search response
+    accum : Accumulator
+        spark accumulator to collect feature names
+
+    Yields
+    ------
+    pyspark.sql.Row
+        Collected feature vectors for a single (wiki, query, hit_page_id)
+    """
+
+    for hit in response['hits']['hits']:
+        page_id = int(hit['_id'])
+        features = []
+        counts = {}
+        for n, v in 
sorted(hit['fields']['_ltrlog'][0]['sltr_log'].iteritems()):
+            features.append(v)
+            counts[n] = counts.get(n, 0) + 1
+        accum += counts
+        yield page_id, Vectors.dense(features)
 
 
 class ScriptFeature(object):
@@ -446,3 +596,153 @@
         df_agg
         .rdd.mapPartitions(collect_partition)
         .toDF(['wikiid', 'query', 'hit_page_id', 'features']))
+
+
+def _explode_ltr_model_definition(definition):
+    """
+    Parse a string describing a ltr featureset or model
+    (featureset|model):name[@storename]
+
+    Parameters
+    ----------
+    definition: string
+        the model/featureset definition
+
+    Returns
+    -------
+        list: 3 elements list: type, name, store
+    """
+    res = re.search('(featureset|model)+:([^@]+)(?:[@](.+))?$', definition)
+    if res is None:
+        raise ValueError("Cannot parse ltr model definition [%s]." % 
(definition))
+    return res.groups()
+
+
+def collect_from_ltr_plugin(df, url_list, model, feature_names_accu, 
indices=None, session_factory=requests.Session):
+    """Collect feature vectors from elasticsearch and the ltr plugin
+
+    Performs queries against a remote elasticsearch instance to collect feature
+    vectors using the features defined in a named featureset or model for all
+    (query, hit_page_id) combinations in df.
+
+    Parameters
+    ----------
+    df : pyspark.sql.DataFrame
+        Source dataframe containing wikiid, query and hit_page_id fields
+        to collect feature vectors for.
+    url_list : list of str
+        List of URLs to send multi-search requests to. One will be chosen at
+        random per partition.
+    model : string
+        definition of the model/featureset: "featureset:name", "model:name" or 
"featureset:name@storeName"
+    indices : dict, optional
+        map from wikiid to elasticsearch index to query. If wikiid is
+        not present the wikiid will be used as index name. (Default: None)
+    session_factory : callable, optional
+        Used to create new sessions. (Default: requests.Session)
+
+    Returns
+    -------
+    pyspark.sql.DataFrame
+        Collected feature vectors with one row per unique (wikiid, query, 
hit_page_id). All
+        feature columns are prefixed with feature_.
+    """
+
+    mjolnir.spark.assert_columns(df, ['wikiid', 'query', 'hit_page_id'])
+
+    if indices is None:
+        indices = {}
+
+    eltType, name, store = _explode_ltr_model_definition(model)
+
+    def collect_partition(rows):
+        """Generate a function that will collect feature vectors for each 
partition.
+
+        Yields
+        -------
+        pyspark.sql.Row
+            Collected feature vectors for a single (wiki, query, hit_page_id)
+        """
+        random.shuffle(url_list)
+        url = url_list.pop()
+        log_query = LtrLoggingQuery(eltType, name, store)
+        with session_factory() as session:
+            for row in rows:
+                req = log_query.make_msearch(row, indices)
+                url, response = mjolnir.cirrus.make_request(session, url, 
url_list, req)
+                parsed = json.loads(response.text)
+                assert 'responses' in parsed, response.text
+                resp = parsed['responses'][0]
+
+                for hit_page_id, features in 
extract_ltr_log_feature_values(resp, feature_names_accu):
+                    yield [row['wikiid'], row['query'], hit_page_id, features]
+
+    return (
+        df
+        .groupBy('wikiid', 'query')
+        .agg(F.collect_set('hit_page_id').alias('hit_page_ids'))
+        .rdd.mapPartitions(collect_partition)
+        .toDF(['wikiid', 'query', 'hit_page_id', 'features']))
+
+
+def collect_from_ltr_plugin_and_kafka(df, brokers, model, feature_names_accu, 
indices=None):
+    """Collect feature vectors from elasticsearch via kafka
+
+    Pushes queries into a kafka topic and retrieves results from a second 
kafka topic.
+    A daemon must be running on relforge to collect the queries and produce 
results.
+
+    Parameters
+    ----------
+    df : pyspark.sql.DataFrame
+        Source dataframe containing wikiid, query and hit_page_id fields
+        to collect feature vectors for.
+    brokers : list of str
+        List of kafka brokers used to bootstrap access into the kafka cluster.
+    model : string
+        definition of the model/featureset: "featureset:name", "model:name" or 
"featureset:name@storeName"
+    feature_names_accu : Accumulator
+        used to collect feature names
+    indices : dict, optional
+        map from wikiid to elasticsearch index to query. If wikiid is
+        not present the wikiid will be used as index name. (Default: None)
+    """
+    mjolnir.spark.assert_columns(df, ['wikiid', 'query', 'hit_page_id'])
+    if indices is None:
+        indices = {}
+    eltType, name, store = _explode_ltr_model_definition(model)
+    log_query = LtrLoggingQuery(eltType, name, store)
+    Response = namedtuple('Response', ['status_code', 'text'])
+
+    def kafka_handle_response(record):
+        response = Response(record['status_code'], record['text'])
+        parsed = json.loads(response.text)
+        assert 'responses' in parsed, response.text
+        response = parsed['responses'][0]
+
+        for hit_page_id, features in extract_ltr_log_feature_values(response, 
feature_names_accu):
+            yield [record['wikiid'], record['query'], hit_page_id, features]
+
+    run_id = base64.b64encode(os.urandom(16))
+    offsets_start = mjolnir.kafka.client.get_offset_start(brokers)
+    print 'producing queries to kafka'
+    num_end_sigils = mjolnir.kafka.client.produce_queries(
+        df.groupBy('wikiid', 
'query').agg(F.collect_set('hit_page_id').alias('hit_page_ids')),
+        brokers,
+        run_id,
+        lambda row: log_query.make_msearch(row, indices))
+    print 'waiting for end run sigils'
+    offsets_end = mjolnir.kafka.client.get_offset_end(brokers, run_id, 
num_end_sigils)
+    print 'reading results from:'
+    for p, (start, end) in enumerate(zip(offsets_start, offsets_end)):
+        print '%d : %d to %d' % (p, start, end)
+    return (
+        mjolnir.kafka.client.collect_results(
+            df._sc,
+            brokers,
+            kafka_handle_response,
+            offsets_start,
+            offsets_end,
+            run_id)
+        .toDF(['wikiid', 'query', 'hit_page_id', 'features'])
+        # We could have gotten duplicate data from kafka. Clean them up.
+        .drop_duplicates(['wikiid', 'query', 'hit_page_id']))
diff --git a/mjolnir/test/fixtures/requests/test_features.sqlite3 
b/mjolnir/test/fixtures/requests/test_features.sqlite3
index e5f81c2..944ae3f 100644
--- a/mjolnir/test/fixtures/requests/test_features.sqlite3
+++ b/mjolnir/test/fixtures/requests/test_features.sqlite3
Binary files differ
diff --git a/mjolnir/test/test_features.py b/mjolnir/test/test_features.py
index 62ea7fe..4b30770 100644
--- a/mjolnir/test/test_features.py
+++ b/mjolnir/test/test_features.py
@@ -34,3 +34,39 @@
     apple_856 = dict(zip(feature_names, apple_856_features.toArray()))
     assert apple_856['title'] == 36.30035
     assert apple_856['file_text'] == 0.0
+
+
+def test_collect_ltr_plugin(spark_context, hive_context, 
make_requests_session):
+    def session_factory():
+        return make_requests_session('requests/test_features.sqlite3')
+
+    r = pyspark.sql.Row('wikiid', 'query', 'hit_page_id')
+    source_data = {
+        'apple': [18978754, 36071326, 856],
+        'foo': [11178, 1140775, 844613]
+    }
+    rows = [r('enwiki', query, page_id) for query, ids in source_data.items() 
for page_id in ids]
+    df = spark_context.parallelize(rows).toDF()
+
+    accu = df._sc.accumulator({}, mjolnir.features.FeatureNamesAccumulator())
+    df_result = mjolnir.features.collect_from_ltr_plugin(df, 
['http://localhost:9200'],
+                                                         
"model:enwiki_100t_v1",
+                                                         accu,
+                                                         {'enwiki': 
'en-wp-ltr-0617_content_first'},
+                                                         
session_factory=session_factory)
+
+    result = df_result.collect()
+
+    # all features must have been logged
+    assert len(set(accu.value.values())) == 1
+    feature_names = sorted(accu.value)
+
+    expected_page_ids = set([row.hit_page_id for row in rows])
+    result_page_ids = set([row.hit_page_id for row in result])
+    assert expected_page_ids == result_page_ids
+
+    apple_856_features = [row.features for row in result if row.query == 
'apple' and row.hit_page_id == 856][0]
+    apple_856 = dict(zip(feature_names, apple_856_features.toArray()))
+    assert apple_856['title'] == 35.9801
+    assert apple_856['auxiliary_text'] == 45.17453
+    assert apple_856['file_text'] == 0.0

-- 
To view, visit https://gerrit.wikimedia.org/r/364725
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Ic258f6bee8bcb0cee3ab802935fc6ab404c85c25
Gerrit-PatchSet: 7
Gerrit-Project: search/MjoLniR
Gerrit-Branch: master
Gerrit-Owner: DCausse <[email protected]>
Gerrit-Reviewer: DCausse <[email protected]>
Gerrit-Reviewer: EBernhardson <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to