codeant-ai-for-open-source[bot] commented on code in PR #40866:
URL: https://github.com/apache/superset/pull/40866#discussion_r3375514675
##########
superset-websocket/src/index.ts:
##########
@@ -264,19 +264,31 @@ export const subscribeToGlobalStream = async (
};
/**
- * Callback function to process events received from a Redis Stream
+ * Callback function to process events received from a Redis Stream.
+ *
+ * For large batches the loop periodically yields to the Node.js event loop
+ * (via setImmediate) so that connection management, health checks and
+ * ping/pong handling are not starved while a burst of events is processed.
+ * The yield cadence is controlled by `eventYieldBatchSize` (0 disables it).
*/
-export const processStreamResults = (results: StreamResult[]): void => {
+export const processStreamResults = async (
+ results: StreamResult[],
+): Promise<void> => {
logger.debug(`events received: ${results}`);
- results.forEach(item => {
+ const { eventYieldBatchSize } = opts;
+ for (let i = 0; i < results.length; i += 1) {
+ if (eventYieldBatchSize > 0 && i > 0 && i % eventYieldBatchSize === 0) {
+ await new Promise(resolve => setImmediate(resolve));
Review Comment:
**Suggestion:** `processStreamResults` was changed to async and now yields
mid-batch, but existing listeners are still invoked in fire-and-forget mode, so
multiple batches can run concurrently and interleave event delivery under
backlog. This breaks the previous in-order batch processing behavior and can
cause out-of-order websocket sends across batches. Make listener contracts
async-aware and await listener completion in stream consumers before advancing
to the next batch. [api mismatch]
<details>
<summary><b>Severity Level:</b> Major ⚠️</summary>
```mdx
- ⚠️ Global event stream websocket clients may see out-of-order updates.
- ⚠️ Job progress UIs relying on ordering may misbehave.
```
</details>
<details>
<summary><b>Steps of Reproduction ✅ </b></summary>
```mdx
1. Configure the websocket sidecar to process large Redis stream batches
with intra-batch
yielding, e.g. set `REDIS_STREAM_READ_COUNT=1000` and
`EVENT_YIELD_BATCH_SIZE=100` so
`redisStreamReadCount` (config in `superset-websocket/src/config.ts:24–26`)
is greater
than `eventYieldBatchSize` (config in
`superset-websocket/src/config.ts:35–37`).
2. Start the websocket sidecar so that
`subscribeToGlobalStream(GLOBAL_EVENT_STREAM_NAME,
processStreamResults)` is invoked at `superset-websocket/src/index.ts:539`,
establishing a
continuous reader on the global Redis stream `GLOBAL_EVENT_STREAM_NAME`
defined at
`superset-websocket/src/index.ts:150`.
3. Under backlog, Redis returns a large batch from `redis.xread` in
`subscribeToGlobalStream` (`superset-websocket/src/index.ts:233–239`), and
the loop calls
`listener(results as StreamResult[])` at line 38 (the `listener` parameter
typed as
`ListenerFunction = (results: StreamResult[]) => void` at
`superset-websocket/src/index.ts:8`) with `processStreamResults` as the
listener, without
awaiting the returned `Promise<void>`.
4. In the first invocation of `processStreamResults`
(`superset-websocket/src/index.ts:55–72`), the function synchronously
processes events up
to the first `await new Promise(resolve => setImmediate(resolve));` at line
61 (after
`eventYieldBatchSize` items) and only then yields; by this point
`subscribeToGlobalStream`
has already updated `lastFirehoseId` at line 39 and re-entered its `while
(true)` loop, so
as soon as Redis has more data a second `xread` returns a newer batch and a
second
`processStreamResults` invocation begins sending events via
`sendToChannel(...)` at line
68 before the first invocation has finished its remaining items, causing
out-of-order
websocket message delivery across batches for clients on the affected
channels.
```
</details>
[Fix in
Cursor](https://app.codeant.ai/fix-in-ide?tool=cursor&prompt_id=201e410183f541ac85efdb3595c64ea3&service=github&base_url=https%3A%2F%2Fgithub.com&org=apache&repo=apache%2Fsuperset)
| [Fix in VSCode
Claude](https://app.codeant.ai/fix-in-ide?tool=vscode-claude&prompt_id=201e410183f541ac85efdb3595c64ea3&service=github&base_url=https%3A%2F%2Fgithub.com&org=apache&repo=apache%2Fsuperset)
*(Use Cmd/Ctrl + Click for best experience)*
<details>
<summary><b>Prompt for AI Agent 🤖 </b></summary>
```mdx
This is a comment left during a code review.
**Path:** superset-websocket/src/index.ts
**Line:** 274:281
**Comment:**
*Api Mismatch: `processStreamResults` was changed to async and now
yields mid-batch, but existing listeners are still invoked in fire-and-forget
mode, so multiple batches can run concurrently and interleave event delivery
under backlog. This breaks the previous in-order batch processing behavior and
can cause out-of-order websocket sends across batches. Make listener contracts
async-aware and await listener completion in stream consumers before advancing
to the next batch.
Validate the correctness of the flagged issue. If correct, How can I resolve
this? If you propose a fix, implement it and please make it concise.
Once fix is implemented, also check other comments on the same PR, and ask
user if the user wants to fix the rest of the comments as well. if said yes,
then fetch all the comments validate the correctness and implement a minimal fix
```
</details>
<a
href='https://app.codeant.ai/feedback?pr_url=https%3A%2F%2Fgithub.com%2Fapache%2Fsuperset%2Fpull%2F40866&comment_hash=302b9bff754a8564cf2115ee37d2eea6f258526db1e937c6801b3c19e1e05607&reaction=like'>👍</a>
| <a
href='https://app.codeant.ai/feedback?pr_url=https%3A%2F%2Fgithub.com%2Fapache%2Fsuperset%2Fpull%2F40866&comment_hash=302b9bff754a8564cf2115ee37d2eea6f258526db1e937c6801b3c19e1e05607&reaction=dislike'>👎</a>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]