EBernhardson has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/349143 )

Change subject: Collect feature vectors from elasticsearch
......................................................................

Collect feature vectors from elasticsearch

Simple and straight forward collection of feature vectors from
elasticsearch. For the moment this skips the kafka middleman
that is planned to be used eventually for shipping data between
analytics and prod networks. That can be added, but seems best to
start with something simple and obvious.

This includes a relatively straight forward way of defining features,
but hopefully as work progresses on the elasticsearch plugin we can
remove that and provide elasticsearch with only the name of some
feature set to collect information about.

Bug: T163407
Change-Id: Iaf3d1eab15728397c8f197c9410477430cdba8a0
---
M .gitignore
A mjolnir/features.py
A mjolnir/test/fixtures/requests/test_features.sqlite3
A mjolnir/test/test_features.py
M setup.py
5 files changed, 408 insertions(+), 1 deletion(-)


  git pull ssh://gerrit.wikimedia.org:29418/search/MjoLniR 
refs/changes/43/349143/1

diff --git a/.gitignore b/.gitignore
index 4b7c536..1e56238 100644
--- a/.gitignore
+++ b/.gitignore
@@ -5,6 +5,7 @@
 
 # Distribution / packaging
 venv/
+build/
 *.egg-info/
 *.egg
 *.log
diff --git a/mjolnir/features.py b/mjolnir/features.py
new file mode 100644
index 0000000..b5504e7
--- /dev/null
+++ b/mjolnir/features.py
@@ -0,0 +1,326 @@
+"""
+Integration for collecting feature vectors from elasticsearch
+"""
+
+from collections import defaultdict
+import json
+import mjolnir.spark
+import pyspark.sql
+from pyspark.sql import functions as F
+import requests
+
+
+def _wrap_with_page_ids(hit_page_ids, should):
+    """Wrap an elasticsearch query with an ids filter.
+
+    Parameters
+    ----------
+    hit_page_ids : list of ints
+        Set of page ids to collect features for
+    should : dict or list of dict
+        Elasticsearch query for a single feature
+
+    Returns
+    -------
+    string
+        JSON encoded elasticsearch query
+    """
+    assert len(hit_page_ids) < 10000
+    if type(should) is not list:
+        should = [should]
+    return json.dumps({
+        "_source": False,
+        "from": 0,
+        "size": 9999,
+        "query": {
+            "bool": {
+                "filter": {
+                    'ids': {
+                        'values': map(str, set(hit_page_ids)),
+                    }
+                },
+                "should": should,
+                "disable_coord": True,
+            }
+        }
+    })
+
+
+class ScriptFeature(object):
+    """
+    Query feature using elasticsearch script_score
+
+    ...
+
+    Methods
+    -------
+    make_query(query)
+        Build the elasticsearch query
+    """
+
+    def __init__(self, name, script, lang='expression'):
+        self.name = name
+        self.script = script
+        self.lang = lang
+
+    def make_query(self, query):
+        """Build the elasticsearch query
+
+        Parameters
+        ----------
+        query : string
+            User provided query term (unused)
+        """
+        return {
+            "function_score": {
+                "score_mode": "sum",
+                "boost_mode": "sum",
+                "functions": [
+                    {
+                        "script_score": {
+                            "script": {
+                                "inline": self.script,
+                                "lang": self.lang,
+                            }
+                        }
+                    }
+                ]
+            }
+        }
+
+
+class MultiMatchFeature(object):
+    """
+    Query feature using elasticsearch multi_match
+
+    ...
+
+    Methods
+    -------
+    make_query(query)
+        Build the elasticsearch query
+    """
+    def __init__(self, name, fields, minimum_should_match=1, 
match_type="most_fields"):
+        """
+
+        Parameters
+        ----------
+        name : string
+            Name of the feature
+        fields : list
+            Fields to perform multi_match against
+        minimum_should_match: int, optional
+            Minimum number of fields that should match. (Default: 1)
+        match_type : string, optional
+            Type of match to perform. (Default: most_fields)
+        """
+        self.name = name
+        assert len(fields) > 0
+        self.fields = fields
+        self.minimum_should_match = minimum_should_match
+        self.match_type = match_type
+
+    def make_query(self, query):
+        """Build the elasticsearch query
+
+        Parameters
+        ----------
+        query : string
+            User provided query term
+        """
+        return {
+            "multi_match": {
+                "query": query,
+                "minimum_should_match": self.minimum_should_match,
+                "type": self.match_type,
+                "fields": self.fields,
+            }
+        }
+
+
+class DisMaxFeature(object):
+    """
+    Query feature using elasticsearch dis_max
+
+    ...
+
+    Methods
+    -------
+    make_query(query)
+        Build the elasticsearch query
+    """
+
+    def __init__(self, name, features):
+        """
+
+        Parameters
+        ----------
+        name : string
+            Name of the feature
+        features : list
+            List of queries to use with dismax
+        """
+        self.name = name
+        assert len(features) > 0
+        self.features = features
+
+    def make_query(self, query):
+        """Build the elasticsearch query
+
+        Parameters
+        ----------
+        query : string
+            User provided query term
+        """
+        return {
+            "dis_max": {
+                "queries": [f.make_query(query) for f in self.features]
+            }
+        }
+
+
+def _create_bulk_query(row, indices, feature_definitions):
+    """Create an elasticsearch bulk query for provided row.
+
+    Parameters
+    ----------
+    row : pyspark.sql.Row
+        Row containing wikiid, query and hit_page_ids columns
+    indices : dict
+        Map from wikiid to elasticsearch index to query
+    feature_definitions : list
+        list of feature objects
+
+    Returns
+    -------
+    string
+    """
+    bulk_query = []
+    if row.wikiid in indices:
+        index = indices[row.wikiid]
+    else:
+        # Takes advantage of aliases for the wikiid typically used by
+        # CirrusSearch
+        index = row.wikiid
+    index_line = '{"index": "%s"}' % (index)
+
+    for feature in feature_definitions:
+        bulk_query.append(index_line)
+        bulk_query.append(_wrap_with_page_ids(row.hit_page_ids,
+                                              feature.make_query(row.query)))
+    # elasticsearch bulk format requires each item to be on a line and the
+    # request to finish with a \n
+    return "%s\n" % ('\n'.join(bulk_query))
+
+
+def _handle_response(response):
+    """Parse an elasticsearch response from requests into a feature vector.
+
+    Returns
+    ------
+    list of tuples
+        List contains two item tuples, each with hit_page_id first
+        and a list of features collected second.
+    """
+    # TODO: retries? something else?
+    assert response.status_code == 200
+    parsed = json.loads(response.text)
+    assert 'responses' in parsed, response.text
+
+    features = defaultdict(list)
+    for response in parsed['responses']:
+        # Again, retries? something else?
+        assert response['status'] == 200
+        for hit in response['hits']['hits']:
+            page_id = int(hit['_id'])
+            features[page_id].append(hit['_score'])
+    return features.items()
+
+
+def enwiki_features():
+    """Default set of features to collect.
+
+    Returns
+    -------
+    list
+    """
+    return [
+        MultiMatchFeature('title', ["title.plain^1", "title^3"]),
+        MultiMatchFeature('category', ["category.plain^1", "category^3"]),
+        MultiMatchFeature('heading', ["heading.plain^1", "heading^3"]),
+        MultiMatchFeature('auxiliary_text', ["auxiliary_text.plain^1", 
"auxiliary_text^3"]),
+        MultiMatchFeature('file_text', ["file_text.plain^1", "file_text^3"]),
+        DisMaxFeature('redirect_or_suggest_dismax', [
+            MultiMatchFeature(None, ["redirect.title.plain^1", 
"redirect.title^3"]),
+            MultiMatchFeature(None, ["suggest"]),
+        ]),
+        DisMaxFeature('text_or_opening_text_dismax', [
+            MultiMatchFeature(None, ["text.plain^1", "text^3"]),
+            MultiMatchFeature(None, ["opening_text.plain^1", 
"opening_text^3"]),
+        ]),
+        MultiMatchFeature('all_near_match', ["all_near_match^2"]),
+        ScriptFeature("popularity_score",
+                      "pow(doc['popularity_score'].value , 0.8) / " +
+                      "( pow(doc['popularity_score'].value, 0.8) + 
pow(8.0E-6,0.8))"),
+        ScriptFeature("incoming_links",
+                      "pow(doc['incoming_links'].value , 0.7) / " +
+                      "( pow(doc['incoming_links'].value, 0.7) + 
pow(30,0.7))"),
+    ]
+
+
+def collect(df, url, feature_definitions, indices=None, 
session_factory=requests.Session):
+    """Collect feature vectors from elasticsearch
+
+
+
+    Parameters
+    ----------
+    df : pyspark.sql.DataFrame
+        Source dataframe containing wikiid, query and hit_page_id fields
+        to collect feature vectors for.
+    url : string
+        URL to send multi-search requests to
+    feature_definitions : list
+        list of feature objects
+    indices : dict, optional
+        map from wikiid to elasticsearch index to query. If wikiid is
+        not present the wikiid will be used as index name. (Default: None)
+    session_factory : callable, optional
+        Used to create new sessions. (Default: requests.Session)
+
+    Returns
+    -------
+    pyspark.sql.DataFrame
+        Collected feature vectors with one row per unique (wikiid, query, 
hit_page_id)
+    """
+
+    mjolnir.spark.assert_columns(df, ['wikiid', 'query', 'hit_page_id'])
+
+    if indices is None:
+        indices = {}
+
+    def collect_partition(rows):
+        """Generate a function that will collect feature vectors for each 
partition.
+
+        Yields
+        -------
+        pyspark.sql.Row
+            Collected feature vectors for a single (wiki, query, hit_page_id)
+        """
+        vector = pyspark.sql.Row(*(['wikiid', 'query', 'hit_page_id'] +
+                                   ["feature_%s" % (f.name) for f in 
feature_definitions]))
+        expected_count = len(feature_definitions)
+        with session_factory() as session:
+            for row in rows:
+                bulk_query = _create_bulk_query(row, indices, 
feature_definitions)
+                response = session.get(url, data=bulk_query)
+                for hit_page_id, features in _handle_response(response):
+                    assert len(features) == expected_count
+                    yield vector(*([row.wikiid, row.query, hit_page_id] + 
features))
+
+    return (
+        df
+        .groupBy('wikiid', 'query')
+        .agg(F.collect_set('hit_page_id').alias('hit_page_ids'))
+        .mapPartitions(collect_partition)
+        .toDF())
diff --git a/mjolnir/test/fixtures/requests/test_features.sqlite3 
b/mjolnir/test/fixtures/requests/test_features.sqlite3
new file mode 100644
index 0000000..cd707eb
--- /dev/null
+++ b/mjolnir/test/fixtures/requests/test_features.sqlite3
Binary files differ
diff --git a/mjolnir/test/test_features.py b/mjolnir/test/test_features.py
new file mode 100644
index 0000000..9bf9bb6
--- /dev/null
+++ b/mjolnir/test/test_features.py
@@ -0,0 +1,79 @@
+import hashlib
+import mjolnir.features
+import os
+import pyspark.sql
+import requests
+import sqlite3
+
+
+class MockSession(object):
+    def __init__(self, fixture_file):
+        self._session = None
+        if fixture_file[0] != '/':
+            dir_path = os.path.dirname(os.path.realpath(__file__))
+            fixture_file = os.path.join(dir_path, fixture_file)
+        # Use sqlite for storage so we don't have to figure out how
+        # multiple pyspark executors write to the same file
+        self.sqlite = sqlite3.connect(fixture_file)
+        self.sqlite.execute(
+            "CREATE TABLE IF NOT EXISTS requests " +
+            "(digest text PRIMARY KEY, status_code int, content text)")
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        pass
+
+    def get(self, url, data=None):
+        md5 = hashlib.md5()
+        md5.update(url)
+        md5.update(data)
+        digest = md5.hexdigest()
+
+        for row in self.sqlite.execute("SELECT status_code, content from 
requests WHERE digest=?", [digest]):
+            return MockResponse(row[0], row[1])
+
+        r = requests.get(url, data=data)
+
+        try:
+            self.sqlite.execute("INSERT INTO requests VALUES (?,?,?)", 
[digest, r.status_code, r.text])
+            self.sqlite.commit()
+        except sqlite3.IntegrityError:
+            # inserted elsewhere? no big deal
+            pass
+
+        return MockResponse(r.status_code, r.text)
+
+
+class MockResponse(object):
+    def __init__(self, status_code, text):
+        self.status_code = status_code
+        self.text = text
+
+
+def session_factory():
+    return MockSession('fixtures/requests/test_features.sqlite3')
+
+
+def test_collect(spark_context, hive_context):
+    r = pyspark.sql.Row('wikiid', 'query', 'hit_page_id')
+    source_data = {
+        'apple': [18978754, 36071326, 856],
+        'foo': [11178, 1140775, 844613]
+    }
+    rows = [r('enwiki', query, page_id) for query, ids in source_data.items() 
for page_id in ids]
+    df = spark_context.parallelize(rows).toDF()
+
+    result = mjolnir.features.collect(df, 'http://localhost:9200/_msearch',
+                                      mjolnir.features.enwiki_features(),
+                                      {'enwiki': 'enwiki_content'},
+                                      
session_factory=session_factory).collect()
+
+    expected_page_ids = set([row.hit_page_id for row in rows])
+    result_page_ids = set([row.hit_page_id for row in result])
+    assert expected_page_ids == result_page_ids
+
+    apple_856 = [row for row in result if row.query == 'apple' and 
row.hit_page_id == 856][0]
+    assert apple_856.feature_title == 36.30035
+    assert apple_856.feature_file_text == 0.0
diff --git a/setup.py b/setup.py
index 30c3736..d8645c7 100644
--- a/setup.py
+++ b/setup.py
@@ -5,15 +5,16 @@
 requirements = [
     # mjolnir requirements
     'clickmodels',
+    'requests',
     # pyspark requirements
     'py4j',
     'numpy',
 ]
 
 test_requirements = [
-    'pytest',
     'findspark',
     'flake8',
+    'pytest',
     'tox',
 ]
 

-- 
To view, visit https://gerrit.wikimedia.org/r/349143
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Iaf3d1eab15728397c8f197c9410477430cdba8a0
Gerrit-PatchSet: 1
Gerrit-Project: search/MjoLniR
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <ebernhard...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to