codeant-ai-for-open-source[bot] commented on code in PR #40857:
URL: https://github.com/apache/superset/pull/40857#discussion_r3374976805


##########
superset-websocket/src/index.ts:
##########
@@ -194,6 +194,24 @@ export const sendToChannel = (channel: string, value: 
EventValue): void => {
   channels[channel].sockets.forEach(socketId => {
     const socketInstance: SocketInstance = sockets[socketId];
     if (!socketInstance) return cleanChannel(channel);
+    // Backpressure: if a slow or stalled client has let its outbound buffer
+    // grow past the configured cap, terminate it rather than buffering
+    // unbounded data in server memory. Opt-in: a cap of 0 disables the check.
+    const { maxSocketBufferBytes } = opts;
+    if (
+      maxSocketBufferBytes > 0 &&
+      socketInstance.ws.bufferedAmount > maxSocketBufferBytes
+    ) {
+      statsd.increment('ws_client_backpressure_disconnect');
+      logger.warn(
+        `Terminating socket on channel ${channel}: send buffer ` +
+          `(${socketInstance.ws.bufferedAmount} bytes) exceeded the ` +
+          `configured limit (${maxSocketBufferBytes} bytes)`,
+      );
+      socketInstance.ws.terminate();
+      cleanChannel(channel);
+      return;

Review Comment:
   **Suggestion:** The backpressure branch terminates the socket and cleans the 
channel list, but it never removes the terminated socket from the global 
`sockets` registry. Because this codebase only deletes from `sockets` during 
periodic `checkSockets`, a burst of slow clients can leave many dead entries 
resident for up to one ping interval, causing avoidable memory growth. Remove 
the current `socketId` from `sockets` immediately when terminating for 
backpressure. [missing cleanup]
   
   <details>
   <summary><b>Severity Level:</b> Major ⚠️</summary>
   
   ```mdx
   - ⚠️ Global `sockets` registry retains dead entries until next ping.
   - ⚠️ Short-term memory usage spikes during bursts of slow clients.
   - ⚠️ `checkSockets` must iterate over extra stale socket entries.
   - ⚠️ Metrics/logs misreport socket count between terminations and GC.
   ```
   </details>
   <details>
   <summary><b>Steps of Reproduction ✅ </b></summary>
   
   ```mdx
   1. Configure a positive backpressure cap (e.g. 
`MAX_SOCKET_BUFFER_BYTES=1048576`) so that
   `opts.maxSocketBufferBytes > 0` in `superset-websocket/src/index.ts:80` and
   `sendToChannel`'s guard at lines 200-203 becomes active; 
`MAX_SOCKET_BUFFER_BYTES` is
   wired via `applyEnvOverrides` in `superset-websocket/src/config.ts:108-138` 
with the
   `MAX_SOCKET_BUFFER_BYTES` setter at lines 127-128.
   
   2. Establish a WebSocket connection so that `wsConnection` in
   `superset-websocket/src/index.ts:84-117` calls `trackClient(channel, 
socketInstance)` at
   lines 86-90; `trackClient` at lines 165-181 stores the connection in the 
global `sockets`
   registry (`sockets[socketId] = socketInstance;` at line 172) and records the 
`socketId` in
   `channels[channel].sockets`.
   
   3. Drive load so that the server publishes many events for that channel (via 
Redis)
   causing `subscribeToGlobalStream` at 
`superset-websocket/src/index.ts:251-260` to feed
   events into `processStreamResults` at lines 28-38, which invokes
   `sendToChannel(data.channel_id, { id, ...data });` at line 34; for a 
sufficiently slow
   client, `socketInstance.ws.bufferedAmount` will eventually exceed
   `opts.maxSocketBufferBytes`, triggering the backpressure branch in 
`sendToChannel` at
   lines 201-204.
   
   4. In that backpressure branch, `sendToChannel` executes 
`socketInstance.ws.terminate();`
   at `superset-websocket/src/index.ts:211` and `cleanChannel(channel);` at 
line 212, but
   never deletes `sockets[socketId]`; `cleanChannel` at lines 207-222 only 
filters
   `channels[channel].sockets` and does not touch the global `sockets` map, and 
the only
   place that removes entries from `sockets` is `checkSockets` at lines 175-200 
(`delete
   sockets[socketId];` at line 196) invoked periodically from 
`setInterval(checkSockets,
   opts.pingSocketsIntervalMs);` at lines 240-241, so a burst of 
backpressure-terminated
   clients leaves many dead `sockets` entries (each still referencing its 
`WebSocket`
   instance) resident until the next ping interval completes the cleanup.
   ```
   </details>
   
   [Fix in 
Cursor](https://app.codeant.ai/fix-in-ide?tool=cursor&prompt_id=5fe7c79e71354e6ab38ff9d3eed573e3&service=github&base_url=https%3A%2F%2Fgithub.com&org=apache&repo=apache%2Fsuperset)
 | [Fix in VSCode 
Claude](https://app.codeant.ai/fix-in-ide?tool=vscode-claude&prompt_id=5fe7c79e71354e6ab38ff9d3eed573e3&service=github&base_url=https%3A%2F%2Fgithub.com&org=apache&repo=apache%2Fsuperset)
   
   *(Use Cmd/Ctrl + Click for best experience)*
   <details>
   <summary><b>Prompt for AI Agent 🤖 </b></summary>
   
   ```mdx
   This is a comment left during a code review.
   
   **Path:** superset-websocket/src/index.ts
   **Line:** 211:213
   **Comment:**
        *Missing Cleanup: The backpressure branch terminates the socket and 
cleans the channel list, but it never removes the terminated socket from the 
global `sockets` registry. Because this codebase only deletes from `sockets` 
during periodic `checkSockets`, a burst of slow clients can leave many dead 
entries resident for up to one ping interval, causing avoidable memory growth. 
Remove the current `socketId` from `sockets` immediately when terminating for 
backpressure.
   
   Validate the correctness of the flagged issue. If correct, How can I resolve 
this? If you propose a fix, implement it and please make it concise.
   Once fix is implemented, also check other comments on the same PR, and ask 
user if the user wants to fix the rest of the comments as well. if said yes, 
then fetch all the comments validate the correctness and implement a minimal fix
   ```
   </details>
   <a 
href='https://app.codeant.ai/feedback?pr_url=https%3A%2F%2Fgithub.com%2Fapache%2Fsuperset%2Fpull%2F40857&comment_hash=a7c5feb4f5d12794c99ee5120cd2ee96aaff1c52bf7934a9729d1578f2b9dca6&reaction=like'>👍</a>
 | <a 
href='https://app.codeant.ai/feedback?pr_url=https%3A%2F%2Fgithub.com%2Fapache%2Fsuperset%2Fpull%2F40857&comment_hash=a7c5feb4f5d12794c99ee5120cd2ee96aaff1c52bf7934a9729d1578f2b9dca6&reaction=dislike'>👎</a>



##########
superset-websocket/src/config.ts:
##########
@@ -120,6 +124,8 @@ function applyEnvOverrides(config: ConfigType): ConfigType {
       (config.pingSocketsIntervalMs = toNumber(val)),
     GC_CHANNELS_INTERVAL_MS: val =>
       (config.gcChannelsIntervalMs = toNumber(val)),
+    MAX_SOCKET_BUFFER_BYTES: val =>
+      (config.maxSocketBufferBytes = toNumber(val)),

Review Comment:
   **Suggestion:** `MAX_SOCKET_BUFFER_BYTES` is parsed with `Number()` and 
assigned directly without validation, so a non-numeric env value becomes `NaN`. 
In `sendToChannel`, `maxSocketBufferBytes > 0` then evaluates false, silently 
disabling backpressure protection instead of enforcing the configured cap. 
Validate this env var as a finite non-negative number and reject or log invalid 
values. [logic error]
   
   <details>
   <summary><b>Severity Level:</b> Major ⚠️</summary>
   
   ```mdx
   - ⚠️ Backpressure feature silently disabled on misconfigured deployments.
   - ⚠️ Slow clients can buffer unbounded data despite configured cap.
   - ⚠️ Operators get no logging about invalid MAX_SOCKET_BUFFER_BYTES.
   - ⚠️ Websocket sidecar `sendToChannel` ignores intended safety guard.
   ```
   </details>
   <details>
   <summary><b>Steps of Reproduction ✅ </b></summary>
   
   ```mdx
   1. Set an invalid non-numeric value for the backpressure cap before starting 
the websocket
   sidecar, e.g. `MAX_SOCKET_BUFFER_BYTES=abc` in the environment used to 
launch the process
   (config mapping lives in `superset-websocket/src/config.ts:108-138`).
   
   2. On process startup, `buildConfig()` in 
`superset-websocket/src/config.ts:150-152` calls
   `defaultConfig()` and then `applyEnvOverrides(config)`, which looks up
   `process.env.MAX_SOCKET_BUFFER_BYTES` and invokes the setter 
`MAX_SOCKET_BUFFER_BYTES: val
   => (config.maxSocketBufferBytes = toNumber(val))` at
   `superset-websocket/src/config.ts:127-128`.
   
   3. Because `toNumber` is just `const toNumber = Number;` at
   `superset-websocket/src/config.ts:104`, `Number('abc')` evaluates to `NaN`, 
so
   `config.maxSocketBufferBytes` (and thus `opts.maxSocketBufferBytes`) becomes 
`NaN` with no
   validation or logging; `opts` is exported from 
`superset-websocket/src/index.ts:80` as
   `export const opts = buildConfig();`.
   
   4. When events are processed from Redis in `processStreamResults()` at
   `superset-websocket/src/index.ts:28-38`, it calls 
`sendToChannel(data.channel_id, { id,
   ...data });` at line 34; inside `sendToChannel` at
   `superset-websocket/src/index.ts:188-223`, the backpressure guard `if
   (maxSocketBufferBytes > 0 && socketInstance.ws.bufferedAmount > 
maxSocketBufferBytes)` at
   lines 201-203 will never execute because `maxSocketBufferBytes` is `NaN` and 
`NaN > 0` is
   always false, silently disabling backpressure even though 
`MAX_SOCKET_BUFFER_BYTES` was
   configured.
   ```
   </details>
   
   [Fix in 
Cursor](https://app.codeant.ai/fix-in-ide?tool=cursor&prompt_id=80c948b602df4ba0a989b2a29a99d153&service=github&base_url=https%3A%2F%2Fgithub.com&org=apache&repo=apache%2Fsuperset)
 | [Fix in VSCode 
Claude](https://app.codeant.ai/fix-in-ide?tool=vscode-claude&prompt_id=80c948b602df4ba0a989b2a29a99d153&service=github&base_url=https%3A%2F%2Fgithub.com&org=apache&repo=apache%2Fsuperset)
   
   *(Use Cmd/Ctrl + Click for best experience)*
   <details>
   <summary><b>Prompt for AI Agent 🤖 </b></summary>
   
   ```mdx
   This is a comment left during a code review.
   
   **Path:** superset-websocket/src/config.ts
   **Line:** 127:128
   **Comment:**
        *Logic Error: `MAX_SOCKET_BUFFER_BYTES` is parsed with `Number()` and 
assigned directly without validation, so a non-numeric env value becomes `NaN`. 
In `sendToChannel`, `maxSocketBufferBytes > 0` then evaluates false, silently 
disabling backpressure protection instead of enforcing the configured cap. 
Validate this env var as a finite non-negative number and reject or log invalid 
values.
   
   Validate the correctness of the flagged issue. If correct, How can I resolve 
this? If you propose a fix, implement it and please make it concise.
   Once fix is implemented, also check other comments on the same PR, and ask 
user if the user wants to fix the rest of the comments as well. if said yes, 
then fetch all the comments validate the correctness and implement a minimal fix
   ```
   </details>
   <a 
href='https://app.codeant.ai/feedback?pr_url=https%3A%2F%2Fgithub.com%2Fapache%2Fsuperset%2Fpull%2F40857&comment_hash=b8086b419f1b07801ce51e59ab28f1c3c6b29fc2803595ceefd061feaa974aba&reaction=like'>👍</a>
 | <a 
href='https://app.codeant.ai/feedback?pr_url=https%3A%2F%2Fgithub.com%2Fapache%2Fsuperset%2Fpull%2F40857&comment_hash=b8086b419f1b07801ce51e59ab28f1c3c6b29fc2803595ceefd061feaa974aba&reaction=dislike'>👎</a>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to