When I use reduceByKeyAndWindow with func and invFunc (in PySpark) the size of the window keeps growing. I am appending the code that reproduces this issue. This prints out the count() of the dstream which goes up every batch by 10 elements.
Is this a bug in the Python version of Scala or is this expected behavior? Here is the code that reproduces this issue. from pyspark import SparkContext from pyspark.streaming import StreamingContext from pprint import pprint print 'Initializing ssc' ssc = StreamingContext(SparkContext(), batchDuration=1) ssc.checkpoint('ckpt') ds = ssc.textFileStream('input') \ .map(lambda event: (event,1)) \ .reduceByKeyAndWindow( func=lambda count1,count2: count1+count2, invFunc=lambda count1,count2: count1-count2, windowDuration=10, slideDuration=2) ds.pprint() ds.count().pprint() print 'Starting ssc' ssc.start() import itertools import time import random from distutils import dir_util def batch_write(batch_data, batch_file_path): with open(batch_file_path,'w') as batch_file: for element in batch_data: line = str(element) + "\n" batch_file.write(line) def xrange_write( batch_size = 5, batch_dir = 'input', batch_duration = 1): '''Every batch_duration write a file with batch_size numbers, forever. Start at 0 and keep incrementing. Intended for testing Spark Streaming code.''' dir_util.mkpath('./input') for i in itertools.count(): min = batch_size * i max = batch_size * (i + 1) batch_data = xrange(min,max) file_path = batch_dir + '/' + str(i) batch_write(batch_data, file_path) time.sleep(batch_duration) print 'Feeding data to app' xrange_write() ssc.awaitTermination()