JevonCowell opened a new pull request, #53558:
URL: https://github.com/apache/spark/pull/53558
### What changes were proposed in this pull request?
- Added a new streaming query event called `onQueryExecutionStart`. This
query event is published every time a streaming query is triggered.
- Added the necessary methods to publish this new event to all
`StreamingQueryListener`s with backwards compatibility.
### Why are the changes needed?
Currently, `StreamingQueryListener`s does not notify users when a query is
triggered; it only provides updates when a query starts, progresses, becomes
idle, or terminates. By introducing an event that is emitted whenever a query
is triggered, users will be informed of this occurrence. This new event allows
`StreamingQueryListener`s to function as a "state machine."
In my use case, I plan to use this new event along with `QueryProgress` and
`QueryIdle` to develop a Maintenance Job Orchestration System. This system will
programmatically submit and execute Spark commands, determining the eligibility
of maintenance job executions based on the activity status of streaming queries.
Bare bone example of Maintenance Job Orchestration System:
```mermaid
sequenceDiagram
participant Scheduler as MaintenanceScheduler
participant Orchestrator as MaintenanceJobOrchestrator
participant Job as MaintenanceJob
participant BackgroundExecutor as BackgroundTaskExecutor
participant Listener as StreamingQueryListener
Orchestrator->>Listener: Register consumer to update query status
Scheduler->>Scheduler: scheduleJob(Job)
alt MaintenanceJob is null
Scheduler->>Scheduler: throw IllegalArgumentException
else MaintenanceJob is not enabled
Scheduler->>Scheduler: return
else
loop Submit to Orchestrator Queue on a schedule
Scheduler->>Orchestrator: enqueue(MaintenanceJobRequest)
end
end
loop Continuous Operation
BackgroundExecutor->>Orchestrator: attempt to run jobs in queue
Listener->>Orchestrator: isStreamingQueryActivelyRunning()
Orchestrator->>Orchestrator: checkIfSafeToRunJob()
alt A Spark job is currently executing
Orchestrator->>Orchestrator: return
else No Spark jobs are currently executing
Orchestrator->>BackgroundExecutor:
submit(job.execute(sparkSession))
BackgroundExecutor->>Job: execute()
Job->>Orchestrator: return result
Orchestrator->>Orchestrator: log job
end
end
Listener->>Listener: shutdown()
BackgroundExecutor->>BackgroundExecutor: shutdown()
Orchestrator->>Orchestrator: shutdown()
Scheduler->>Scheduler: shutdown()
```
I envision several other valuable use cases, such as enhanced metrics,
improved alerting capabilities, and potential optimizations within Spark itself.
Initially, I considered introducing a `QueryAwaitEvent` to complement the
`onQueryExecutionStart`. This could provide a cleaner approach, avoiding the
need to manage both `onQueryProgress` and `onQueryIdle`. However, I believe
addressing the current gap with `onQueryExecutionStart` alone is more effective.
### Does this PR introduce _any_ user-facing change?
Yes/No. I tried my best to match how `QueryIdle` was added to maintain
backwards compatibility & `onQueryExecutionStart` egress is disabled by
default. Users will have to use spark conf to opt into this feature. Existing
implementations should **not** break as a result of this.
Users should do the following to opt-in:
```java
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder
.appName("ExampleApp")
.config("spark.sql.streaming.query.trigger.start.event.enabled", true)
.config("spark.sql.streaming.query.trigger.start.event.minInterval", 2000)
.getOrCreate()
```
To utilize the new method they just need to override it like so:
Scala:
```scala
/**
* Called when a query's microbatch trigger is started.
* @since 4.2.0
*/
def onQueryTriggerStart(event: onQueryExecutionStart): Unit = {
println(s"Query trigger started: ${event.id}")
}
```
Java:
```java
@Override
public void onQueryTriggerStart(onQueryExecutionStart event) {
System.out.println("Query trigger started: " + event.getId());
}
```
Python:
```python
def onQueryTriggerStart(self, event):
print(f"Query trigger started: {event.id}")
```
<!--
Note that it means *any* user-facing change including all aspects such as
new features, bug fixes, or other behavior changes. Documentation-only updates
are not considered user-facing changes.
If yes, please clarify the previous behavior and the change this PR proposes
- provide the console output, description and/or an example to show the
behavior difference if possible.
If possible, please also clarify if this is a user-facing change compared to
the released Spark versions or within the unreleased branches such as master.
If no, write 'No'.
-->
[] TODO
### How was this patch tested?
<!--
If tests were added, say they were added here. Please make sure to add some
test cases that check the changes thoroughly including negative and positive
cases if possible.
If it was tested in a way different from regular unit tests, please clarify
how you tested step by step, ideally copy and paste-able, so that other
reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why
it was difficult to add.
If benchmark tests were added, please run the benchmarks in GitHub Actions
for the consistent environment, and the instructions could accord to:
https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
-->
[] IN-PROGRESS
### Was this patch authored or co-authored using generative AI tooling?
<!--
If generative AI tooling has been used in the process of authoring this
patch, please include the
phrase: 'Generated-by: ' followed by the name of the tool and its version.
If no, write 'No'.
Please refer to the [ASF Generative Tooling
Guidance](https://www.apache.org/legal/generative-tooling.html) for details.
-->
Generated-by: Claude Haiku 4.5
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]