JevonCowell opened a new pull request, #53558:
URL: https://github.com/apache/spark/pull/53558

   ### What changes were proposed in this pull request?
   
   - Added a new streaming query event called `onQueryExecutionStart`. This 
query event is published every time a streaming query is triggered.
   - Added the necessary methods to publish this new event to all 
`StreamingQueryListener`s with backwards compatibility. 
   
   ### Why are the changes needed?
   Currently, `StreamingQueryListener`s does not notify users when a query is 
triggered; it only provides updates when a query starts, progresses, becomes 
idle, or terminates. By introducing an event that is emitted whenever a query 
is triggered, users will be informed of this occurrence. This new event allows 
`StreamingQueryListener`s to function as a "state machine." 
   
   In my use case, I plan to use this new event along with `QueryProgress` and 
`QueryIdle` to develop a Maintenance Job Orchestration System. This system will 
programmatically submit and execute Spark commands, determining the eligibility 
of maintenance job executions based on the activity status of streaming queries.
   
   Bare bone example of Maintenance Job Orchestration System:
   ```mermaid
   sequenceDiagram
       participant Scheduler as MaintenanceScheduler
       participant Orchestrator as MaintenanceJobOrchestrator
       participant Job as MaintenanceJob
       participant BackgroundExecutor as BackgroundTaskExecutor
       participant Listener as StreamingQueryListener
       Orchestrator->>Listener: Register consumer to update query status
       Scheduler->>Scheduler: scheduleJob(Job)
       alt MaintenanceJob is null
           Scheduler->>Scheduler: throw IllegalArgumentException
       else MaintenanceJob is not enabled
           Scheduler->>Scheduler: return
       else
           loop Submit to Orchestrator Queue on a schedule
               Scheduler->>Orchestrator: enqueue(MaintenanceJobRequest)
           end
       end
       loop Continuous Operation
           BackgroundExecutor->>Orchestrator: attempt to run jobs in queue
           Listener->>Orchestrator: isStreamingQueryActivelyRunning()
           Orchestrator->>Orchestrator: checkIfSafeToRunJob()
           alt A Spark job is currently executing
               Orchestrator->>Orchestrator: return
           else No Spark jobs are currently executing
               Orchestrator->>BackgroundExecutor: 
submit(job.execute(sparkSession))
               BackgroundExecutor->>Job: execute()
               Job->>Orchestrator: return result
               Orchestrator->>Orchestrator: log job
           end
       end
       Listener->>Listener: shutdown()
       BackgroundExecutor->>BackgroundExecutor: shutdown()
       Orchestrator->>Orchestrator: shutdown()
       Scheduler->>Scheduler: shutdown()
   ```
   
   I envision several other valuable use cases, such as enhanced metrics, 
improved alerting capabilities, and potential optimizations within Spark itself.
   
   Initially, I considered introducing a `QueryAwaitEvent` to complement the 
`onQueryExecutionStart`. This could provide a cleaner approach, avoiding the 
need to manage both `onQueryProgress` and `onQueryIdle`. However, I believe 
addressing the current gap with `onQueryExecutionStart` alone is more effective.
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes/No. I tried my best to match how `QueryIdle` was added to maintain 
backwards compatibility & `onQueryExecutionStart` egress is disabled by 
default. Users will have to use spark conf to opt into this feature. Existing 
implementations should **not** break as a result of this. 
   
   Users should do the following to opt-in:
   
   ```java
   import org.apache.spark.sql.SparkSession
   
   val spark = SparkSession.builder
     .appName("ExampleApp")
     .config("spark.sql.streaming.query.trigger.start.event.enabled", true)
     .config("spark.sql.streaming.query.trigger.start.event.minInterval", 2000)
     .getOrCreate()
   ``` 
   To utilize the new method they just need to override it like so:
   
   Scala:
   ```scala
   /**
    * Called when a query's microbatch trigger is started.
    * @since 4.2.0
    */
   def onQueryTriggerStart(event: onQueryExecutionStart): Unit = {
     println(s"Query trigger started: ${event.id}")
   }
   ```
   Java:
   ```java
   
   @Override
   public void onQueryTriggerStart(onQueryExecutionStart event) {
       System.out.println("Query trigger started: " + event.getId());
   }
   ```
   Python:
   ```python
   def onQueryTriggerStart(self, event):
       print(f"Query trigger started: {event.id}")
   ```
   
   <!--
   Note that it means *any* user-facing change including all aspects such as 
new features, bug fixes, or other behavior changes. Documentation-only updates 
are not considered user-facing changes.
   
   If yes, please clarify the previous behavior and the change this PR proposes 
- provide the console output, description and/or an example to show the 
behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to 
the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   [] TODO
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some 
test cases that check the changes thoroughly including negative and positive 
cases if possible.
   If it was tested in a way different from regular unit tests, please clarify 
how you tested step by step, ideally copy and paste-able, so that other 
reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why 
it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions 
for the consistent environment, and the instructions could accord to: 
https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   
   [] IN-PROGRESS
   
   ### Was this patch authored or co-authored using generative AI tooling?
   <!--
   If generative AI tooling has been used in the process of authoring this 
patch, please include the
   phrase: 'Generated-by: ' followed by the name of the tool and its version.
   If no, write 'No'.
   Please refer to the [ASF Generative Tooling 
Guidance](https://www.apache.org/legal/generative-tooling.html) for details.
   -->
   Generated-by: Claude Haiku 4.5


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to