EBernhardson has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/358492 )

Change subject: Move some filtering steps out of sampling
......................................................................

Move some filtering steps out of sampling

A followup patch will be adding a more advanced query normalization
step, and this filtering needs to happen before the query normalization.
Pull it out to make that easier.

* Moves q_by_ip_day filtering from sampling to main data_pipeline
* Remove wikiid filtering from sampling, this was already applied in
  data_pipeline

Change-Id: Ib5691718d4d6bdd8179d19aee81753877961fe19
---
M mjolnir/cli/data_pipeline.py
M mjolnir/sampling.py
2 files changed, 7 insertions(+), 25 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/search/MjoLniR 
refs/changes/92/358492/1

diff --git a/mjolnir/cli/data_pipeline.py b/mjolnir/cli/data_pipeline.py
index d0d1040..8abd8cd 100644
--- a/mjolnir/cli/data_pipeline.py
+++ b/mjolnir/cli/data_pipeline.py
@@ -36,6 +36,9 @@
         sqlContext.read.parquet(input_dir)
         # Limit to the wikis we are working against
         .where(mjolnir.sampling._array_contains(F.array(map(F.lit, wikis)), 
F.col('wikiid')))
+        # Drop requests from 'too busy' IP's. These are plausibly bots, or 
maybe just proxys.
+        .where(F.col('q_by_ip_day') < 50)
+        .drop('q_by_ip_day')
         # Clicks and hits contains a bunch of useful debugging data, but we 
don't
         # need any of that here. Save a bunch of memory by only working with
         # lists of page ids
@@ -52,8 +55,7 @@
             df_clicks,
             wikis=wikis,
             seed=54321,
-            queries_per_wiki=queries_per_wiki,
-            min_sessions_per_query=min_sessions_per_query)
+            queries_per_wiki=queries_per_wiki)
         # Explode source into a row per displayed hit
         .select('*', F.expr("posexplode(hit_page_ids)").alias('hit_position', 
'hit_page_id'))
         .drop('hit_page_ids')
@@ -130,8 +132,6 @@
             'weightedNdcgAt10': weightedNdcgAt10,
             'ndcgAt10': ndcgAt10,
         })))
-
-
     df_hits_with_features.write.parquet(output_dir)
 
 
diff --git a/mjolnir/sampling.py b/mjolnir/sampling.py
index 4adb578..cade5ba 100644
--- a/mjolnir/sampling.py
+++ b/mjolnir/sampling.py
@@ -192,11 +192,6 @@
         Require each chosen query to have at least this many sessions per
         query. This is necessary To train the DBN later in the pipeline.
         (Default: 35)
-    max_queries_per_ip_day : int, optional
-        Requires each chosen query to have at most this many full text searches
-        issued from it's IP on the day the query was issued. This Filters out
-        high volume users which are quite possibly bots or other non-standard
-        sessions. (Default: 50)
 
     Returns
     -------
@@ -206,27 +201,14 @@
     """
     mjolnir.spark.assert_columns(df, ['wikiid', 'norm_query', 'session_id', 
'q_by_ip_day'])
 
-    # Filter down the input into the wikis we care about and remove sessions
-    # from overly active users, which are presumably bots.
-    df_filtered = (
-        df
-        .where(_array_contains(F.array([F.lit(wiki) for wiki in wikis]),
-                               F.col('wikiid')))
-        .where(df.q_by_ip_day <= max_queries_per_ip_day)
-        .drop(df.q_by_ip_day))
-
     # Aggregate down into a unique set of (wikiid, norm_query) and add in a
     # count of the number of unique sessions per pair. Filter on the number
     # of sessions as we need some minimum number of sessions per query to train
     # the DBN
     df_queries_unique = (
-        df_filtered
+        df
         .groupBy('wikiid', 'norm_query')
-        # To make QuantileDiscretizer happy later on, we need
-        # to cast this to a double. Can be removed in 2.x which
-        # accepts anything numeric.
-        
.agg(F.countDistinct('session_id').cast('double').alias('num_sessions'))
-        .where(F.col('num_sessions') >= min_sessions_per_query)
+        .agg(F.countDistinct('session_id').alias('num_sessions'))
         # This rdd will be used multiple times through strata generation and
         # sampling. Cache to not duplicate the filtering and aggregation work.
         # Spark will eventually throw this away in an LRU fashion.
@@ -235,4 +217,4 @@
     df_queries_sampled = _sample_queries(df_queries_unique, wikis, 
samples_desired=queries_per_wiki, seed=seed)
 
     # Select the rows chosen by sampling from the filtered df
-    return df_filtered.join(df_queries_sampled, how='inner', on=['wikiid', 
'norm_query'])
+    return df.join(df_queries_sampled, how='inner', on=['wikiid', 
'norm_query'])

-- 
To view, visit https://gerrit.wikimedia.org/r/358492
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ib5691718d4d6bdd8179d19aee81753877961fe19
Gerrit-PatchSet: 1
Gerrit-Project: search/MjoLniR
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <ebernhard...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to