jenkins-bot has submitted this change and it was merged. Change subject: Don't allow null values in popularity score ......................................................................
Don't allow null values in popularity score It turns out there are a couple nulls page_id's in here, which turns into a failed request for every wiki when we transfer the data over to es. Additionally did a mild refactor to allow writing a test case. To run the tests you need a full install of spark, simplest was to copy the folder to stat1002 and run the test there through spark-submit. Change-Id: I1856e9cd7434b066e5ad2dfa4174c89263a0a9cb --- M oozie/popularity_score/popularityScore.py A oozie/popularity_score/popularityScoreTest.py 2 files changed, 82 insertions(+), 27 deletions(-) Approvals: Smalyshev: Looks good to me, approved jenkins-bot: Verified diff --git a/oozie/popularity_score/popularityScore.py b/oozie/popularity_score/popularityScore.py index 1688942..3292007 100644 --- a/oozie/popularity_score/popularityScore.py +++ b/oozie/popularity_score/popularityScore.py @@ -74,6 +74,39 @@ return {item.project: item.view_count for item in data} + +def calcPopularityScore(sc, source): + filtered = source.filter( + source.page_id.isNotNull() + ) + + aggregated = filtered.groupBy( + source.page_id, + source.project, + ).agg( + source.project, + source.page_id, + pyspark.sql.functions.sum(source.view_count).alias("view_count"), + ) + + projectPageViews = sc.broadcast(calcProjectPageViews(filtered)) + # This is a very naive version of the popularity score, likely it will be extended over + # time to be more robust. For the initial iterations this should be sufficient though. + popularityScore = pyspark.sql.functions.udf( + lambda view_count, project: view_count / float(projectPageViews.value[project]), + pyspark.sql.types.DoubleType(), + ) + + print("Calculating popularity score") + return aggregated.select( + aggregated.project, + aggregated.page_id, + popularityScore( + aggregated.view_count, + aggregated.project + ).alias('score'), + ) + if __name__ == "__main__": args = parser.parse_args() sc = pyspark.SparkContext(appName="Discovery Popularity Score") @@ -83,33 +116,7 @@ print("loading pageview data from:") print("\t" + "\n\t".join(parquetPaths) + "\n") dataFrame = sqlContext.parquetFile(*parquetPaths) - - aggregated = dataFrame.groupBy( - dataFrame.page_id, - dataFrame.project, - ).agg( - dataFrame.project, - dataFrame.page_id, - pyspark.sql.functions.sum(dataFrame.view_count).alias("view_count"), - ) - - projectPageViews = sc.broadcast(calcProjectPageViews(dataFrame)) - # This is a very naive version of the popularity score, likely it will be extended over - # time to be more robust. For the initial iterations this should be sufficient though. - popularityScore = pyspark.sql.functions.udf( - lambda view_count, project: view_count / float(projectPageViews.value[project]), - pyspark.sql.types.DoubleType(), - ) - - print("Calculating popularity score") - result = aggregated.select( - aggregated.project, - aggregated.page_id, - popularityScore( - aggregated.view_count, - aggregated.project - ).alias('score'), - ) + result = calcPopularityScore(sc, dataFrame) deleteHdfsDir(args.output_dir) # the default spark.sql.shuffle.partitions creates 200 partitions, resulting in 3mb files. diff --git a/oozie/popularity_score/popularityScoreTest.py b/oozie/popularity_score/popularityScoreTest.py new file mode 100644 index 0000000..3bc11ff --- /dev/null +++ b/oozie/popularity_score/popularityScoreTest.py @@ -0,0 +1,48 @@ +import unittest +import json +import pyspark +import pyspark.sql + +from popularityScore import calcPopularityScore + + +class PopularityScoreTest(unittest.TestCase): + def setUp(self): + self.sc = pyspark.SparkContext() + self.sql = pyspark.sql.SQLContext(self.sc) + + def createDataFrame(self, fixture): + encoded = [json.dumps(item) for item in fixture] + return self.sql.jsonRDD(self.sc.parallelize(encoded)) + + def test_foo(self): + fixture = self.createDataFrame([ + { + 'project': 'en.wikipedia', + 'page_id': 12345, + 'view_count': 1, + }, + { + 'project': 'en.wikipedia', + 'page_id': 12345, + 'view_count': 5, + + }, + { + 'project': 'en.wikipedia', + 'page_id': None, + 'view_count': 5, + }, + ]) + + result = [row.asDict() for row in calcPopularityScore(self.sc, fixture).collect()] + self.assertEqual(result, [ + { + u'project': 'en.wikipedia', + u'page_id': 12345, + u'score': 1.0, + } + ]) + +if __name__ == '__main__': + unittest.main() -- To view, visit https://gerrit.wikimedia.org/r/270340 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: I1856e9cd7434b066e5ad2dfa4174c89263a0a9cb Gerrit-PatchSet: 4 Gerrit-Project: wikimedia/discovery/analytics Gerrit-Branch: master Gerrit-Owner: EBernhardson <ebernhard...@wikimedia.org> Gerrit-Reviewer: EBernhardson <ebernhard...@wikimedia.org> Gerrit-Reviewer: Smalyshev <smalys...@wikimedia.org> Gerrit-Reviewer: jenkins-bot <> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits