Hi Team,
I'm developing a streaming project that obtains tweets in real time and after
applying some ML models and transformations generate a treemap of the data. The
problem I'm facing is that after processing each batch and after the timestamp
is completed, memory isn't liberated.
I've been able to increase the time it takes to collapse the memory up to 9
hours by changing the Memory fraction to 0.7 (default 0.5) but regardless of
how many tweets arrive, it starts at 2GB, increases to 4GB-6GB with the first
batches and slowly oscillates from 6.98GB to 7.24GB until it collapses after
several hours (on a EC2 with 7.7GB of RAM).
The aggregated number of updated state rows evolves as expected with the input
rows, but the aggregated state memory used in bytes is only released three
times during an execution of nine and it gets higher than it was on the
following batches.
As a reference, this is the code I use to add the timestamp column and after
some transformations apply the window and watermark to avoid processing late
tweets.
timeStampCol = tweets.withColumn('timestamp', current_timestamp())
//apply some transformations to timeStampCol, resulting in transformationsDF
//watermark of 10 minutes and tumbling window of 5 minutes. entLab == entities
(obtained with SparkNLP NER pretrained pipeline), sentNum == sentiment of the
tweet (obtained with SparkNLP sentiment pretrained pipeline)
finalDF = transformationsDF.withWatermark("timestamp", "10 minutes").\
groupBy(window("timestamp", "5 minutes"), "entLab").\
agg(avg("sentNum").alias("avgSent"), count("sentNum").alias("countEnt"))
query = finalDF.writeStream.queryName('treemapResult').\
foreachBatch(processBatch).outputMode("update").\
option("checkpointLocation", "/tmp/checkpoints").start()
//the processBatch function writes the dataframe to a .csv file
I've also tried using append mode but the memory consumption is very similar.
Is there something wrong with the declaration of the window/watermark? What
could be causing the data to keep accumulating even after the 10 minute
watermark and after the batch is processed?
If there's any additional information you might need or think might be helpful
to understand better the problem I'll be happy to provide it.
You all have been able to help in the past so thank you in advance.