EBernhardson has uploaded a new change for review. (
https://gerrit.wikimedia.org/r/391622 )
Change subject: Replace custom array_contains with Column.isin
......................................................................
Replace custom array_contains with Column.isin
Not sure if this is new or I just wasn't aware of it at the time,
but spark has a native Column.isin that does the same as our
usage of the custom _array_contains method (checking a column has
a value one of a provided array of values).
Change-Id: I504492070c7cde4a4d93f2ff9c104b3f127b2757
---
M mjolnir/sampling.py
M mjolnir/utilities/data_pipeline.py
2 files changed, 1 insertion(+), 32 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/search/MjoLniR
refs/changes/22/391622/1
diff --git a/mjolnir/sampling.py b/mjolnir/sampling.py
index e65281c..9ba3cf8 100644
--- a/mjolnir/sampling.py
+++ b/mjolnir/sampling.py
@@ -15,37 +15,6 @@
from pyspark.sql.column import Column, _to_java_column
-def _array_contains(array, value):
- """Generic version of pyspark.sql.functions.array_contains
-
- array_contains provided by pyspark only allow checking if a value is inside
- a column, but the value has to be a literal and not a column from the row.
- This generalizes the function to allow the value to be a column, checking
- if a column is within a provided literal array.
-
- >>> df = sc.parallelize([['foo'], ['bar']]).toDF(['id'])
- >>> df.select(_array_contains(F.array(map(F.lit, ['this', 'is', 'foo'])),
F.col('id'))).collect()
- [Row(array_contains(array(this,is,foo),id)=True),
Row(array_contains(array(this,is,foo),id)=False)]
-
- Parameters
- ----------
- array : pyspark.sql.Column
- value : pyspark.sql.Column
-
- Returns
- -------
- pyspark.sql.Column
- Column representing the array_contains expression
- """
- j_array_expr = _to_java_column(array).expr()
- j_value_expr = _to_java_column(value).expr()
-
- sql = pyspark.SparkContext._active_spark_context._jvm.org.apache.spark.sql
- j_expr = sql.catalyst.expressions.ArrayContains(j_array_expr, j_value_expr)
- jc = sql.Column(j_expr)
- return Column(jc)
-
-
def _calc_splits(df, num_buckets=100):
"""Calculate the right edge of num_session buckets
diff --git a/mjolnir/utilities/data_pipeline.py
b/mjolnir/utilities/data_pipeline.py
index 62ec121..a5c37d1 100644
--- a/mjolnir/utilities/data_pipeline.py
+++ b/mjolnir/utilities/data_pipeline.py
@@ -40,7 +40,7 @@
df_clicks = (
sqlContext.read.parquet(input_dir)
# Limit to the wikis we are working against
- .where(mjolnir.sampling._array_contains(F.array(map(F.lit, wikis)),
F.col('wikiid')))
+ .where(F.col('wikiid').isin(wikis))
# Drop requests from 'too busy' IP's. These are plausibly bots, or
maybe just proxys.
.where(F.col('q_by_ip_day') < 50)
.drop('q_by_ip_day')
--
To view, visit https://gerrit.wikimedia.org/r/391622
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I504492070c7cde4a4d93f2ff9c104b3f127b2757
Gerrit-PatchSet: 1
Gerrit-Project: search/MjoLniR
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits