Hi all,

I'm working on a pipeline for collaborative filtering. Taking the movielens
example, I have a data frame with the columns 'userID', 'movieID', and
'rating'. I would like to transform the ratings before calling ALS and
denormalise after. I implemented two transformers to do this, but I'm
wondering whether there is a better way than using a global variable to
hold the rowMeans of the utility matrix to share the normalisation vector
across both transformers? The pipeline gets more complicated if the
normalisation is done over columns and clustering of the userIDs is used
prior to calling ALS. Any help is greatly appreciated.

Thank you,
Dominik


The code snippets are as follows below. I'm using the pyspark.ml APIs:

# global variable
rowMeans = None

# Transformers
class Normaliser(Transformer, HasInputCol, HasOutputCol):
    @keyword_only
    def __init__(self, inputCol='rating', outputCol='normalised'):
        super(Normaliser, self).__init__()
        kwargs = self.__init__._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)

    def _transform(self, df):
        global rowMeans
        rowMeans = df.groupBy('userID') \
                     .agg(F.mean(self.inputCol).alias('mean'))
        return df.join(rowMeans, 'userID') \
                 .select(df['*'], (df[self.inputCol] -
rowMeans['mean']).alias(self.outputCol))


class DeNormaliser(Transformer, HasInputCol, HasOutputCol):
    @keyword_only
    def __init__(self, inputCol='normalised', outputCol='rating'):
        super(DeNormaliser, self).__init__()
        kwargs = self.__init__._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)

    def _transform(self, df):
        return df.join(rowMeans, 'userID') \
                 .select(df['*'], (df[self.getInputCol()] +
rowMeans['mean']) \
                         .alias(self.getOutputCol()))


# setting up the ML pipeline
rowNormaliser = Normaliser(inputCol='rating', outputCol='rowNorm')
als = ALS(userCol='userID', itemCol='movieID', ratingCol='rowNorm')
rowDeNormaliser = DeNormaliser(inputCol='prediction',
outputCol='denormPrediction')

pipeline = Pipeline(stages=[rowNormaliser, als, rowDeNormaliser])
evaluator = RegressionEvaluator(predictionCol='denormPrediction',
labelCol='rating')

Reply via email to