jenkins-bot has submitted this change and it was merged.

Change subject: Don't allow null values in popularity score
......................................................................


Don't allow null values in popularity score

It turns out there are a couple nulls page_id's in here, which turns
into a failed request for every wiki when we transfer the data over to
es.

Additionally did a mild refactor to allow writing a test case. To run
the tests you need a full install of spark, simplest was to copy the
folder to stat1002 and run the test there through spark-submit.

Change-Id: I1856e9cd7434b066e5ad2dfa4174c89263a0a9cb
---
M oozie/popularity_score/popularityScore.py
A oozie/popularity_score/popularityScoreTest.py
2 files changed, 82 insertions(+), 27 deletions(-)

Approvals:
  Smalyshev: Looks good to me, approved
  jenkins-bot: Verified



diff --git a/oozie/popularity_score/popularityScore.py 
b/oozie/popularity_score/popularityScore.py
index 1688942..3292007 100644
--- a/oozie/popularity_score/popularityScore.py
+++ b/oozie/popularity_score/popularityScore.py
@@ -74,6 +74,39 @@
 
     return {item.project: item.view_count for item in data}
 
+
+def calcPopularityScore(sc, source):
+    filtered = source.filter(
+        source.page_id.isNotNull()
+    )
+
+    aggregated = filtered.groupBy(
+        source.page_id,
+        source.project,
+    ).agg(
+        source.project,
+        source.page_id,
+        pyspark.sql.functions.sum(source.view_count).alias("view_count"),
+    )
+
+    projectPageViews = sc.broadcast(calcProjectPageViews(filtered))
+    # This is a very naive version of the popularity score, likely it will be 
extended over
+    # time to be more robust. For the initial iterations this should be 
sufficient though.
+    popularityScore = pyspark.sql.functions.udf(
+        lambda view_count, project: view_count / 
float(projectPageViews.value[project]),
+        pyspark.sql.types.DoubleType(),
+    )
+
+    print("Calculating popularity score")
+    return aggregated.select(
+        aggregated.project,
+        aggregated.page_id,
+        popularityScore(
+            aggregated.view_count,
+            aggregated.project
+        ).alias('score'),
+    )
+
 if __name__ == "__main__":
     args = parser.parse_args()
     sc = pyspark.SparkContext(appName="Discovery Popularity Score")
@@ -83,33 +116,7 @@
     print("loading pageview data from:")
     print("\t" + "\n\t".join(parquetPaths) + "\n")
     dataFrame = sqlContext.parquetFile(*parquetPaths)
-
-    aggregated = dataFrame.groupBy(
-        dataFrame.page_id,
-        dataFrame.project,
-    ).agg(
-        dataFrame.project,
-        dataFrame.page_id,
-        pyspark.sql.functions.sum(dataFrame.view_count).alias("view_count"),
-    )
-
-    projectPageViews = sc.broadcast(calcProjectPageViews(dataFrame))
-    # This is a very naive version of the popularity score, likely it will be 
extended over
-    # time to be more robust. For the initial iterations this should be 
sufficient though.
-    popularityScore = pyspark.sql.functions.udf(
-        lambda view_count, project: view_count / 
float(projectPageViews.value[project]),
-        pyspark.sql.types.DoubleType(),
-    )
-
-    print("Calculating popularity score")
-    result = aggregated.select(
-        aggregated.project,
-        aggregated.page_id,
-        popularityScore(
-            aggregated.view_count,
-            aggregated.project
-        ).alias('score'),
-    )
+    result = calcPopularityScore(sc, dataFrame)
 
     deleteHdfsDir(args.output_dir)
     # the default spark.sql.shuffle.partitions creates 200 partitions, 
resulting in 3mb files.
diff --git a/oozie/popularity_score/popularityScoreTest.py 
b/oozie/popularity_score/popularityScoreTest.py
new file mode 100644
index 0000000..3bc11ff
--- /dev/null
+++ b/oozie/popularity_score/popularityScoreTest.py
@@ -0,0 +1,48 @@
+import unittest
+import json
+import pyspark
+import pyspark.sql
+
+from popularityScore import calcPopularityScore
+
+
+class PopularityScoreTest(unittest.TestCase):
+    def setUp(self):
+        self.sc = pyspark.SparkContext()
+        self.sql = pyspark.sql.SQLContext(self.sc)
+
+    def createDataFrame(self, fixture):
+        encoded = [json.dumps(item) for item in fixture]
+        return self.sql.jsonRDD(self.sc.parallelize(encoded))
+
+    def test_foo(self):
+        fixture = self.createDataFrame([
+            {
+                'project': 'en.wikipedia',
+                'page_id': 12345,
+                'view_count': 1,
+            },
+            {
+                'project': 'en.wikipedia',
+                'page_id': 12345,
+                'view_count': 5,
+
+            },
+            {
+                'project': 'en.wikipedia',
+                'page_id': None,
+                'view_count': 5,
+            },
+        ])
+
+        result = [row.asDict() for row in calcPopularityScore(self.sc, 
fixture).collect()]
+        self.assertEqual(result, [
+            {
+                u'project': 'en.wikipedia',
+                u'page_id': 12345,
+                u'score': 1.0,
+            }
+        ])
+
+if __name__ == '__main__':
+    unittest.main()

-- 
To view, visit https://gerrit.wikimedia.org/r/270340
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I1856e9cd7434b066e5ad2dfa4174c89263a0a9cb
Gerrit-PatchSet: 4
Gerrit-Project: wikimedia/discovery/analytics
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <ebernhard...@wikimedia.org>
Gerrit-Reviewer: EBernhardson <ebernhard...@wikimedia.org>
Gerrit-Reviewer: Smalyshev <smalys...@wikimedia.org>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to