Hey Jungtaek!

Wanted to update the mailing list on my current approach in case others wanted something similar.
I created an asynchronous poller iterates though all active queries and checks of the isTriggering boolean value is true.

Here’s an example code snippet: 

```java
public static void checkAndUpdateSubscribers(SparkSession sparkSession) {
    StreamingQuery[] activeQueries = sparkSession.streams().active();

    boolean anyTriggering = false;

    for (StreamingQuery query : activeQueries) {
        if (query.isActive() && query.recentProgress().length > 0) {
            boolean isTriggering = query.lastProgress() != null &&
                                   query.lastProgress().numInputRows() > 0;

            if (isTriggering) {
                anyTriggering = true;
                break;
            }
        }
    }

    notifySubscribers(anyTriggering);
}

private static void notifySubscribers(boolean isTriggering) {
    Iterator<WeakReference<Consumer<Boolean>>> iterator = subscribers.iterator();

    while (iterator.hasNext()) {
        WeakReference<Consumer<Boolean>> weakRef = iterator.next();
        Consumer<Boolean> consumer = weakRef.get();

        if (consumer != null) {
            consumer.accept(isTriggering);
        } else {
            iterator.remove();
        }
    }
}
```

I have yet to look at QueryListener’s implementation but people i’ll take a stab at it.

Regards,
Jevon C

On Mar 27, 2025, at 6:04 PM, Jungtaek Lim <kabhwan.opensou...@gmail.com> wrote:


Hi Jevon,

> From testing, I see that `onQueryIdle` does not trigger when a query is waiting for the next trigger interval.

Yeah it's based on trigger - if no trigger has been triggered, the event cannot be sent.

> I wanted to get thoughts on whether it’s worth implementing a new QueryListener method (something like `onQueryWait`) that will report when a streaming query is awaiting a new trigger.

If it's not hard and non-perf regression to implement what you said about "onQueryWait", I think this is the ideal behavior of "onQueryIdle" and you are welcome to modify the criteria of onQueryIdle rather than introducing new event. It just needs to coordinate with the current trigger and not produce the idle event if it somehow starts executing microbatch - this is tricky (as now we are talking about threading), but if there is an easy way to make it work, that would be ideal.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Thu, Mar 27, 2025 at 5:50 AM Jevon Cowell <jcow...@atlassian.com.invalid> wrote:
Hello!

This is my first time ever utilizing a mailing list, so I apologize if I’m not conforming to any standards or rules (and please correct me where obvious). I’m looking to inquire about Spark’s StreamingQueryListener.

I currently have a Spark Streaming job with a trigger interval of 10 minutes in a cluster. I want to periodically execute maintenance jobs (OPTIMIZE, DELETE, VACUUM) in the same cluster to save on compute resources. Ideally, I don’t want all of these jobs running concurrently or when the Spark Streaming job is processing data. I want to implement a `StreamingQueryListener` to detect when any streaming queries are running and delay the execution of the maintenance jobs. From testing, I see that `onQueryIdle` does not trigger when a query is waiting for the next trigger interval. Before diving into the Apache Spark code, I wanted to get thoughts on whether it’s worth implementing a new QueryListener method (something like `onQueryWait`) that will report when a streaming query is awaiting a new trigger.

Thoughts? Is this too naive?
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to