codeant-ai-for-open-source[bot] commented on code in PR #40866:
URL: https://github.com/apache/superset/pull/40866#discussion_r3375514675


##########
superset-websocket/src/index.ts:
##########
@@ -264,19 +264,31 @@ export const subscribeToGlobalStream = async (
 };
 
 /**
- * Callback function to process events received from a Redis Stream
+ * Callback function to process events received from a Redis Stream.
+ *
+ * For large batches the loop periodically yields to the Node.js event loop
+ * (via setImmediate) so that connection management, health checks and
+ * ping/pong handling are not starved while a burst of events is processed.
+ * The yield cadence is controlled by `eventYieldBatchSize` (0 disables it).
  */
-export const processStreamResults = (results: StreamResult[]): void => {
+export const processStreamResults = async (
+  results: StreamResult[],
+): Promise<void> => {
   logger.debug(`events received: ${results}`);
-  results.forEach(item => {
+  const { eventYieldBatchSize } = opts;
+  for (let i = 0; i < results.length; i += 1) {
+    if (eventYieldBatchSize > 0 && i > 0 && i % eventYieldBatchSize === 0) {
+      await new Promise(resolve => setImmediate(resolve));

Review Comment:
   **Suggestion:** `processStreamResults` was changed to async and now yields 
mid-batch, but existing listeners are still invoked in fire-and-forget mode, so 
multiple batches can run concurrently and interleave event delivery under 
backlog. This breaks the previous in-order batch processing behavior and can 
cause out-of-order websocket sends across batches. Make listener contracts 
async-aware and await listener completion in stream consumers before advancing 
to the next batch. [api mismatch]
   
   <details>
   <summary><b>Severity Level:</b> Major ⚠️</summary>
   
   ```mdx
   - ⚠️ Global event stream websocket clients may see out-of-order updates.
   - ⚠️ Job progress UIs relying on ordering may misbehave.
   ```
   </details>
   <details>
   <summary><b>Steps of Reproduction ✅ </b></summary>
   
   ```mdx
   1. Configure the websocket sidecar to process large Redis stream batches 
with intra-batch
   yielding, e.g. set `REDIS_STREAM_READ_COUNT=1000` and 
`EVENT_YIELD_BATCH_SIZE=100` so
   `redisStreamReadCount` (config in `superset-websocket/src/config.ts:24–26`) 
is greater
   than `eventYieldBatchSize` (config in 
`superset-websocket/src/config.ts:35–37`).
   
   2. Start the websocket sidecar so that 
`subscribeToGlobalStream(GLOBAL_EVENT_STREAM_NAME,
   processStreamResults)` is invoked at `superset-websocket/src/index.ts:539`, 
establishing a
   continuous reader on the global Redis stream `GLOBAL_EVENT_STREAM_NAME` 
defined at
   `superset-websocket/src/index.ts:150`.
   
   3. Under backlog, Redis returns a large batch from `redis.xread` in
   `subscribeToGlobalStream` (`superset-websocket/src/index.ts:233–239`), and 
the loop calls
   `listener(results as StreamResult[])` at line 38 (the `listener` parameter 
typed as
   `ListenerFunction = (results: StreamResult[]) => void` at
   `superset-websocket/src/index.ts:8`) with `processStreamResults` as the 
listener, without
   awaiting the returned `Promise<void>`.
   
   4. In the first invocation of `processStreamResults`
   (`superset-websocket/src/index.ts:55–72`), the function synchronously 
processes events up
   to the first `await new Promise(resolve => setImmediate(resolve));` at line 
61 (after
   `eventYieldBatchSize` items) and only then yields; by this point 
`subscribeToGlobalStream`
   has already updated `lastFirehoseId` at line 39 and re-entered its `while 
(true)` loop, so
   as soon as Redis has more data a second `xread` returns a newer batch and a 
second
   `processStreamResults` invocation begins sending events via 
`sendToChannel(...)` at line
   68 before the first invocation has finished its remaining items, causing 
out-of-order
   websocket message delivery across batches for clients on the affected 
channels.
   ```
   </details>
   
   [Fix in 
Cursor](https://app.codeant.ai/fix-in-ide?tool=cursor&prompt_id=201e410183f541ac85efdb3595c64ea3&service=github&base_url=https%3A%2F%2Fgithub.com&org=apache&repo=apache%2Fsuperset)
 | [Fix in VSCode 
Claude](https://app.codeant.ai/fix-in-ide?tool=vscode-claude&prompt_id=201e410183f541ac85efdb3595c64ea3&service=github&base_url=https%3A%2F%2Fgithub.com&org=apache&repo=apache%2Fsuperset)
   
   *(Use Cmd/Ctrl + Click for best experience)*
   <details>
   <summary><b>Prompt for AI Agent 🤖 </b></summary>
   
   ```mdx
   This is a comment left during a code review.
   
   **Path:** superset-websocket/src/index.ts
   **Line:** 274:281
   **Comment:**
        *Api Mismatch: `processStreamResults` was changed to async and now 
yields mid-batch, but existing listeners are still invoked in fire-and-forget 
mode, so multiple batches can run concurrently and interleave event delivery 
under backlog. This breaks the previous in-order batch processing behavior and 
can cause out-of-order websocket sends across batches. Make listener contracts 
async-aware and await listener completion in stream consumers before advancing 
to the next batch.
   
   Validate the correctness of the flagged issue. If correct, How can I resolve 
this? If you propose a fix, implement it and please make it concise.
   Once fix is implemented, also check other comments on the same PR, and ask 
user if the user wants to fix the rest of the comments as well. if said yes, 
then fetch all the comments validate the correctness and implement a minimal fix
   ```
   </details>
   <a 
href='https://app.codeant.ai/feedback?pr_url=https%3A%2F%2Fgithub.com%2Fapache%2Fsuperset%2Fpull%2F40866&comment_hash=302b9bff754a8564cf2115ee37d2eea6f258526db1e937c6801b3c19e1e05607&reaction=like'>👍</a>
 | <a 
href='https://app.codeant.ai/feedback?pr_url=https%3A%2F%2Fgithub.com%2Fapache%2Fsuperset%2Fpull%2F40866&comment_hash=302b9bff754a8564cf2115ee37d2eea6f258526db1e937c6801b3c19e1e05607&reaction=dislike'>👎</a>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to