Anil Dasari created SPARK-48418: ----------------------------------- Summary: Spark structured streaming: Add microbatch timestamp to foreachBatch method Key: SPARK-48418 URL: https://issues.apache.org/jira/browse/SPARK-48418 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 3.5.1 Reporter: Anil Dasari
We are on Spark 3.x and using Spark dstream + kafka and planning to use structured streaming + Kafka. Differences in Dstream microbatch and structured streaming microbatch metadata is making migration difficult. Dstream#foreachRDD gives both microbatch RDD and start timestamp (in long). However, structured streaming Dataset#foreachBatch returns only microbatch dataset and batchID where BatchID is a numeric number. micorbatch start time used across our data pipelines and final result. Could you add microbatch start timestamp to Dataset#foreachBatch method? Pseudo code : {code:java} val inputStream = sparkSession.readStream.format("rate").load inputStream .writeStream .trigger(Trigger.ProcessingTime(10 * 1000)) .foreachBatch { (ds: Dataset[Row], batchId: Long, batchTime: Long) => // batchTime is microbatch triggered/start timestamp // application logic. } .start() .awaitTermination() {code} Implementation approach when batchTime is trigger executor executed time: ( `currentTriggerStartTimestamp` can be used as well as batch time. Trigger executor time is source of microbatch and also can be easily added to query processor event as well) # Add trigger time to [TriggerExecutor|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala#L31] {code:java} trait TriggerExecutor { // added batchTime (Long) argument def execute(batchRunner: (MicroBatchExecutionContext, Long) => Boolean): Unit ... // (other methods) }{code} # Update ProcessingTimeExecutor and other executors to pass trigger time. {code:java} override def execute(triggerHandler: (MicroBatchExecutionContext, Long) => Boolean): Unit = { while (true) { val triggerTimeMs = clock.getTimeMillis() val nextTriggerTimeMs = nextBatchTime(triggerTimeMs) // pass triggerTimeMs to runOneBatch which invokes triggerHandler and is used in MicroBatchExecution#runActivatedStream method. val terminated = !runOneBatch(triggerHandler, triggerTimeMs) ... } } {code} # Add argument executionTime (long) argument to MicroBatchExecution#excuteOneBatch method [here|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L330] # Pass execution time in [runBatch|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L380C11-L380C19] and [here|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L849] # Finally add following following method `foreachBatch` in `DataStreamWriter` and update existing `foreachBatch` methods for new argument. and also add it to query processor event. {code:java} def foreachBatch(function: (Dataset[T], Long, Long) => Unit): DataStreamWriter[T] = { this.source = SOURCE_NAME_FOREACH_BATCH if (function == null) throw new IllegalArgumentException("foreachBatch function cannot be null") this.foreachBatchWriter = function this }{code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org