Hi all, I'm working on a pipeline for collaborative filtering. Taking the movielens example, I have a data frame with the columns 'userID', 'movieID', and 'rating'. I would like to transform the ratings before calling ALS and denormalise after. I implemented two transformers to do this, but I'm wondering whether there is a better way than using a global variable to hold the rowMeans of the utility matrix to share the normalisation vector across both transformers? The pipeline gets more complicated if the normalisation is done over columns and clustering of the userIDs is used prior to calling ALS. Any help is greatly appreciated.
Thank you, Dominik The code snippets are as follows below. I'm using the pyspark.ml APIs: # global variable rowMeans = None # Transformers class Normaliser(Transformer, HasInputCol, HasOutputCol): @keyword_only def __init__(self, inputCol='rating', outputCol='normalised'): super(Normaliser, self).__init__() kwargs = self.__init__._input_kwargs self.setParams(**kwargs) @keyword_only def setParams(self, inputCol=None, outputCol=None): kwargs = self.setParams._input_kwargs return self._set(**kwargs) def _transform(self, df): global rowMeans rowMeans = df.groupBy('userID') \ .agg(F.mean(self.inputCol).alias('mean')) return df.join(rowMeans, 'userID') \ .select(df['*'], (df[self.inputCol] - rowMeans['mean']).alias(self.outputCol)) class DeNormaliser(Transformer, HasInputCol, HasOutputCol): @keyword_only def __init__(self, inputCol='normalised', outputCol='rating'): super(DeNormaliser, self).__init__() kwargs = self.__init__._input_kwargs self.setParams(**kwargs) @keyword_only def setParams(self, inputCol=None, outputCol=None): kwargs = self.setParams._input_kwargs return self._set(**kwargs) def _transform(self, df): return df.join(rowMeans, 'userID') \ .select(df['*'], (df[self.getInputCol()] + rowMeans['mean']) \ .alias(self.getOutputCol())) # setting up the ML pipeline rowNormaliser = Normaliser(inputCol='rating', outputCol='rowNorm') als = ALS(userCol='userID', itemCol='movieID', ratingCol='rowNorm') rowDeNormaliser = DeNormaliser(inputCol='prediction', outputCol='denormPrediction') pipeline = Pipeline(stages=[rowNormaliser, als, rowDeNormaliser]) evaluator = RegressionEvaluator(predictionCol='denormPrediction', labelCol='rating')