Hi,
I have a spark streaming task that basically does the following, 1. Read a batch using a custom receiver 2. Parse and apply transforms to the batch 3. Convert the raw fields to a bunch of features 4. Use a pre-built model to predict the class of each record in the batch 5. Output the result to a DB Everything was working fine and my streaming pipeline was pretty stable. I realized that the results were wrong as some of the features in the records required cumulative data, for example "Ratio of Dest IP" to "Total number of IPs", for a given "source IP".Now these features are not correct when I am dealing with a batch cause the batch only has a micro view of the entire dataset.So I changed the code and inserted another step 3a). This step will accumulate the data for a given "Source IP" over multiple batches.so far so good. To achieve this I used a dataframe which is a "var" instead of a "val", and as new batches came in, I extract the "SIP" based data and union it with the existing dataframe and also do a bit of filtering as I do not want my data to keep on increasing in size over time (keep only say 30 mins worth of data). Now when I test the system I see that the "processing time" for each batch keeps on continuously increasing, I understand it to go up until the 30 min mark but at that point as data gets filtered based on time, the size of the SIP rdd (DF) is almost constant but the processing time is increasing. This leads to my streaming pipleline to eventually become unstable and the app dies of OOM. (The receiver executor gets bloated and dies). I have tested this for almost a week now and this line, srcHostsDF.filter(srcHostsDF("last_updated_time") > ejectTime) .union(batInterDF) .persist() Where "batInterDF" is the received batch and 'srcHostDF', is the Dataframe that I keep data across batches. Shows up in the "spark ui" as increasing over time. The size of "srcHostsDF" is fairly constant, if so why should the time taken by persist go up. The other two calls that showup as increasing in time are srcHostsDF.count() and srcHostDF.rdd Why should this be the case ? Any clues on what is happening ?? I replaced the "persist" with a "repartition" and I still get the similar results. The below image shows the "executor memory" growth, the app start a lil after 18:20, the neon green line is the "receiver executor", the others are the "process executors". There a 5 executors and 1 driver in all. One tick before 19:00 is where the size of the 'srcHostDF' stabilises. Regards -Ravi Gurram