EBernhardson has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/358492 )
Change subject: Move some filtering steps out of sampling ...................................................................... Move some filtering steps out of sampling A followup patch will be adding a more advanced query normalization step, and this filtering needs to happen before the query normalization. Pull it out to make that easier. * Moves q_by_ip_day filtering from sampling to main data_pipeline * Remove wikiid filtering from sampling, this was already applied in data_pipeline Change-Id: Ib5691718d4d6bdd8179d19aee81753877961fe19 --- M mjolnir/cli/data_pipeline.py M mjolnir/sampling.py 2 files changed, 7 insertions(+), 25 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/search/MjoLniR refs/changes/92/358492/1 diff --git a/mjolnir/cli/data_pipeline.py b/mjolnir/cli/data_pipeline.py index d0d1040..8abd8cd 100644 --- a/mjolnir/cli/data_pipeline.py +++ b/mjolnir/cli/data_pipeline.py @@ -36,6 +36,9 @@ sqlContext.read.parquet(input_dir) # Limit to the wikis we are working against .where(mjolnir.sampling._array_contains(F.array(map(F.lit, wikis)), F.col('wikiid'))) + # Drop requests from 'too busy' IP's. These are plausibly bots, or maybe just proxys. + .where(F.col('q_by_ip_day') < 50) + .drop('q_by_ip_day') # Clicks and hits contains a bunch of useful debugging data, but we don't # need any of that here. Save a bunch of memory by only working with # lists of page ids @@ -52,8 +55,7 @@ df_clicks, wikis=wikis, seed=54321, - queries_per_wiki=queries_per_wiki, - min_sessions_per_query=min_sessions_per_query) + queries_per_wiki=queries_per_wiki) # Explode source into a row per displayed hit .select('*', F.expr("posexplode(hit_page_ids)").alias('hit_position', 'hit_page_id')) .drop('hit_page_ids') @@ -130,8 +132,6 @@ 'weightedNdcgAt10': weightedNdcgAt10, 'ndcgAt10': ndcgAt10, }))) - - df_hits_with_features.write.parquet(output_dir) diff --git a/mjolnir/sampling.py b/mjolnir/sampling.py index 4adb578..cade5ba 100644 --- a/mjolnir/sampling.py +++ b/mjolnir/sampling.py @@ -192,11 +192,6 @@ Require each chosen query to have at least this many sessions per query. This is necessary To train the DBN later in the pipeline. (Default: 35) - max_queries_per_ip_day : int, optional - Requires each chosen query to have at most this many full text searches - issued from it's IP on the day the query was issued. This Filters out - high volume users which are quite possibly bots or other non-standard - sessions. (Default: 50) Returns ------- @@ -206,27 +201,14 @@ """ mjolnir.spark.assert_columns(df, ['wikiid', 'norm_query', 'session_id', 'q_by_ip_day']) - # Filter down the input into the wikis we care about and remove sessions - # from overly active users, which are presumably bots. - df_filtered = ( - df - .where(_array_contains(F.array([F.lit(wiki) for wiki in wikis]), - F.col('wikiid'))) - .where(df.q_by_ip_day <= max_queries_per_ip_day) - .drop(df.q_by_ip_day)) - # Aggregate down into a unique set of (wikiid, norm_query) and add in a # count of the number of unique sessions per pair. Filter on the number # of sessions as we need some minimum number of sessions per query to train # the DBN df_queries_unique = ( - df_filtered + df .groupBy('wikiid', 'norm_query') - # To make QuantileDiscretizer happy later on, we need - # to cast this to a double. Can be removed in 2.x which - # accepts anything numeric. - .agg(F.countDistinct('session_id').cast('double').alias('num_sessions')) - .where(F.col('num_sessions') >= min_sessions_per_query) + .agg(F.countDistinct('session_id').alias('num_sessions')) # This rdd will be used multiple times through strata generation and # sampling. Cache to not duplicate the filtering and aggregation work. # Spark will eventually throw this away in an LRU fashion. @@ -235,4 +217,4 @@ df_queries_sampled = _sample_queries(df_queries_unique, wikis, samples_desired=queries_per_wiki, seed=seed) # Select the rows chosen by sampling from the filtered df - return df_filtered.join(df_queries_sampled, how='inner', on=['wikiid', 'norm_query']) + return df.join(df_queries_sampled, how='inner', on=['wikiid', 'norm_query']) -- To view, visit https://gerrit.wikimedia.org/r/358492 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ib5691718d4d6bdd8179d19aee81753877961fe19 Gerrit-PatchSet: 1 Gerrit-Project: search/MjoLniR Gerrit-Branch: master Gerrit-Owner: EBernhardson <ebernhard...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits