LuciferYang opened a new pull request, #56742:
URL: https://github.com/apache/spark/pull/56742

   ### What changes were proposed in this pull request?
   
   `PipelineEventSender` streams pipeline events to a Connect client through a 
single background
   thread backed by a bounded queue (sized by the event-queue capacity conf). 
When that queue is
   full it intentionally drops non-terminal `FlowProgress` and other 
non-`RunProgress` events to
   avoid blocking execution; terminal `FlowProgress` events and all 
`RunProgress` events are always
   enqueued. Until now those drops were completely silent.
   
   This PR makes them observable:
   
   - a running count of dropped events, exposed as `numDroppedEvents`;
   - a warning logged when an event is dropped, throttled to at most once per 
minute (the first drop
     always logs) so a persistently full queue does not flood the logs;
   - a summary warning at shutdown reporting the total, so drops that were 
suppressed by the throttle
     window are still surfaced.
   
   The throttling follows the approach `AsyncEventQueue` already uses for the 
same "queue full, drop
   events" situation.
   
   ### Why are the changes needed?
   
   A dropped event is lost progress reporting to the client. Previously a drop 
produced no log line
   and no counter, so an operator who noticed gaps in pipeline progress had no 
signal that events were
   being discarded, let alone how many.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No. The counter is internal; the only outward change is the additional WARN 
log lines emitted when
   events are dropped.
   
   ### How was this patch tested?
   
   New unit tests in `PipelineEventSenderSuite`:
   
   - a sender that never overflows its queue reports zero dropped events;
   - events dropped at capacity are counted (`numDroppedEvents`);
   - the per-drop warning is throttled (logged once for two in-window drops) 
and the shutdown summary
     is emitted;
   - the warning is re-logged once the throttle interval has elapsed.
   
   The drop scenarios use a `CountDownLatch` handshake to park the worker 
deterministically rather
   than relying on timing. The existing capacity test continues to cover the 
enqueue/drop routing.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   Generated-by: Claude Code (Claude Opus 4.8)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to