Anil Dasari created SPARK-48418:
-----------------------------------

             Summary: Spark structured streaming: Add microbatch timestamp to 
foreachBatch method
                 Key: SPARK-48418
                 URL: https://issues.apache.org/jira/browse/SPARK-48418
             Project: Spark
          Issue Type: Improvement
          Components: Structured Streaming
    Affects Versions: 3.5.1
            Reporter: Anil Dasari


We are on Spark 3.x and using Spark dstream + kafka and planning to use 
structured streaming + Kafka.

Differences in Dstream microbatch and structured streaming microbatch metadata 
is making migration difficult. 

Dstream#foreachRDD gives both microbatch RDD and start timestamp (in long). 
However, structured streaming Dataset#foreachBatch returns only microbatch 
dataset and batchID where BatchID is a numeric number. 

micorbatch start time used across our data pipelines and final result. 

Could you add microbatch start timestamp to  Dataset#foreachBatch method?

Pseudo code :

 
{code:java}
val inputStream = sparkSession.readStream.format("rate").load

inputStream
  .writeStream
  .trigger(Trigger.ProcessingTime(10 * 1000))
  .foreachBatch {
    (ds: Dataset[Row], batchId: Long, batchTime: Long) => // batchTime is 
microbatch triggered/start timestamp
      
     // application logic.
  }
  .start()
  .awaitTermination() {code}
 

Implementation approach when batchTime is trigger executor executed time:

( `currentTriggerStartTimestamp` can be used as well as batch time. Trigger 
executor time is source of microbatch and also can be easily added to query 
processor event as well)
 # Add trigger time to 
[TriggerExecutor|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala#L31]
 
{code:java}
trait TriggerExecutor {
  // added batchTime (Long) argument 
  def execute(batchRunner: (MicroBatchExecutionContext, Long) => Boolean): Unit 

  ... // (other methods)
}{code}

 # Update ProcessingTimeExecutor and other executors to pass trigger time.
{code:java}
override def execute(triggerHandler: (MicroBatchExecutionContext, Long) => 
Boolean): Unit = {
  while (true) {
    val triggerTimeMs = clock.getTimeMillis()
    val nextTriggerTimeMs = nextBatchTime(triggerTimeMs)
    
    // pass triggerTimeMs to runOneBatch which invokes triggerHandler and is 
used in MicroBatchExecution#runActivatedStream method.
    val terminated = !runOneBatch(triggerHandler, triggerTimeMs)
    
   ...
  }
} {code}
 

 # Add argument executionTime (long) argument to 
MicroBatchExecution#excuteOneBatch method 
[here|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L330]

 #  

Pass execution time in 
[runBatch|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L380C11-L380C19]
 and 
[here|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L849]

 # Finally add following following method `foreachBatch` in `DataStreamWriter` 
and update existing `foreachBatch` methods for new argument.  and also add it 
to query processor event. 
{code:java}
def foreachBatch(function: (Dataset[T], Long, Long) => Unit): 
DataStreamWriter[T] = {
  this.source = SOURCE_NAME_FOREACH_BATCH
  if (function == null) throw new IllegalArgumentException("foreachBatch 
function cannot be null")
  this.foreachBatchWriter = function
  this
}{code}

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to