Hi all, Sorry if this isn't the right place to ask basic questions, but I'm at the end of my rope here - please let me know where else I can get help if this isn't the right place.
I'm trying to continuously read from a Kafka topic and send the number of rows Spark has received to a metric tracking service. I'm expecting an unbounded stream of input data, so I need to send the number of rows periodically and will sum them within the metric tracking service. I thought counting per Dataframe or over non-overlapping periods of time would make sense, but I haven't had luck with either. Whatever approach I take, I inevitably need to call count() which triggers Spark to execute the DAG and terminate the application (presumably because the count() action has been completed). What I really need is for my Spark application to receive data indefinitely, count the rows periodically, and send the count(s) to the metric tracker. My current program looks something like this: val df = spark .readStream .format("kafka") .<other options> .load() .select($"partition", $"timestamp", $"offset", $"value" cast "string") val metricTracker = new metricTracker() //Track the metric, second parameter needs to be type Long metricTracker.track("numberOfRows", df.count()) //Output data to console val query = df .writeStream .outputMode("append") .format("console") .trigger(Trigger.Continuous("5 seconds")) .start() If I remove the metricTracker lines, it receives data indefinitely and prints it to console. When I add the highlighted call to df.count(), it executes and terminates the program very quickly. Any ideas on how I can send the number of rows Spark is receiving/processing from a stream with no end? Thanks, Basil