codeant-ai-for-open-source[bot] commented on code in PR #40857:
URL: https://github.com/apache/superset/pull/40857#discussion_r3374976805
##########
superset-websocket/src/index.ts:
##########
@@ -194,6 +194,24 @@ export const sendToChannel = (channel: string, value:
EventValue): void => {
channels[channel].sockets.forEach(socketId => {
const socketInstance: SocketInstance = sockets[socketId];
if (!socketInstance) return cleanChannel(channel);
+ // Backpressure: if a slow or stalled client has let its outbound buffer
+ // grow past the configured cap, terminate it rather than buffering
+ // unbounded data in server memory. Opt-in: a cap of 0 disables the check.
+ const { maxSocketBufferBytes } = opts;
+ if (
+ maxSocketBufferBytes > 0 &&
+ socketInstance.ws.bufferedAmount > maxSocketBufferBytes
+ ) {
+ statsd.increment('ws_client_backpressure_disconnect');
+ logger.warn(
+ `Terminating socket on channel ${channel}: send buffer ` +
+ `(${socketInstance.ws.bufferedAmount} bytes) exceeded the ` +
+ `configured limit (${maxSocketBufferBytes} bytes)`,
+ );
+ socketInstance.ws.terminate();
+ cleanChannel(channel);
+ return;
Review Comment:
**Suggestion:** The backpressure branch terminates the socket and cleans the
channel list, but it never removes the terminated socket from the global
`sockets` registry. Because this codebase only deletes from `sockets` during
periodic `checkSockets`, a burst of slow clients can leave many dead entries
resident for up to one ping interval, causing avoidable memory growth. Remove
the current `socketId` from `sockets` immediately when terminating for
backpressure. [missing cleanup]
<details>
<summary><b>Severity Level:</b> Major ⚠️</summary>
```mdx
- ⚠️ Global `sockets` registry retains dead entries until next ping.
- ⚠️ Short-term memory usage spikes during bursts of slow clients.
- ⚠️ `checkSockets` must iterate over extra stale socket entries.
- ⚠️ Metrics/logs misreport socket count between terminations and GC.
```
</details>
<details>
<summary><b>Steps of Reproduction ✅ </b></summary>
```mdx
1. Configure a positive backpressure cap (e.g.
`MAX_SOCKET_BUFFER_BYTES=1048576`) so that
`opts.maxSocketBufferBytes > 0` in `superset-websocket/src/index.ts:80` and
`sendToChannel`'s guard at lines 200-203 becomes active;
`MAX_SOCKET_BUFFER_BYTES` is
wired via `applyEnvOverrides` in `superset-websocket/src/config.ts:108-138`
with the
`MAX_SOCKET_BUFFER_BYTES` setter at lines 127-128.
2. Establish a WebSocket connection so that `wsConnection` in
`superset-websocket/src/index.ts:84-117` calls `trackClient(channel,
socketInstance)` at
lines 86-90; `trackClient` at lines 165-181 stores the connection in the
global `sockets`
registry (`sockets[socketId] = socketInstance;` at line 172) and records the
`socketId` in
`channels[channel].sockets`.
3. Drive load so that the server publishes many events for that channel (via
Redis)
causing `subscribeToGlobalStream` at
`superset-websocket/src/index.ts:251-260` to feed
events into `processStreamResults` at lines 28-38, which invokes
`sendToChannel(data.channel_id, { id, ...data });` at line 34; for a
sufficiently slow
client, `socketInstance.ws.bufferedAmount` will eventually exceed
`opts.maxSocketBufferBytes`, triggering the backpressure branch in
`sendToChannel` at
lines 201-204.
4. In that backpressure branch, `sendToChannel` executes
`socketInstance.ws.terminate();`
at `superset-websocket/src/index.ts:211` and `cleanChannel(channel);` at
line 212, but
never deletes `sockets[socketId]`; `cleanChannel` at lines 207-222 only
filters
`channels[channel].sockets` and does not touch the global `sockets` map, and
the only
place that removes entries from `sockets` is `checkSockets` at lines 175-200
(`delete
sockets[socketId];` at line 196) invoked periodically from
`setInterval(checkSockets,
opts.pingSocketsIntervalMs);` at lines 240-241, so a burst of
backpressure-terminated
clients leaves many dead `sockets` entries (each still referencing its
`WebSocket`
instance) resident until the next ping interval completes the cleanup.
```
</details>
[Fix in
Cursor](https://app.codeant.ai/fix-in-ide?tool=cursor&prompt_id=5fe7c79e71354e6ab38ff9d3eed573e3&service=github&base_url=https%3A%2F%2Fgithub.com&org=apache&repo=apache%2Fsuperset)
| [Fix in VSCode
Claude](https://app.codeant.ai/fix-in-ide?tool=vscode-claude&prompt_id=5fe7c79e71354e6ab38ff9d3eed573e3&service=github&base_url=https%3A%2F%2Fgithub.com&org=apache&repo=apache%2Fsuperset)
*(Use Cmd/Ctrl + Click for best experience)*
<details>
<summary><b>Prompt for AI Agent 🤖 </b></summary>
```mdx
This is a comment left during a code review.
**Path:** superset-websocket/src/index.ts
**Line:** 211:213
**Comment:**
*Missing Cleanup: The backpressure branch terminates the socket and
cleans the channel list, but it never removes the terminated socket from the
global `sockets` registry. Because this codebase only deletes from `sockets`
during periodic `checkSockets`, a burst of slow clients can leave many dead
entries resident for up to one ping interval, causing avoidable memory growth.
Remove the current `socketId` from `sockets` immediately when terminating for
backpressure.
Validate the correctness of the flagged issue. If correct, How can I resolve
this? If you propose a fix, implement it and please make it concise.
Once fix is implemented, also check other comments on the same PR, and ask
user if the user wants to fix the rest of the comments as well. if said yes,
then fetch all the comments validate the correctness and implement a minimal fix
```
</details>
<a
href='https://app.codeant.ai/feedback?pr_url=https%3A%2F%2Fgithub.com%2Fapache%2Fsuperset%2Fpull%2F40857&comment_hash=a7c5feb4f5d12794c99ee5120cd2ee96aaff1c52bf7934a9729d1578f2b9dca6&reaction=like'>👍</a>
| <a
href='https://app.codeant.ai/feedback?pr_url=https%3A%2F%2Fgithub.com%2Fapache%2Fsuperset%2Fpull%2F40857&comment_hash=a7c5feb4f5d12794c99ee5120cd2ee96aaff1c52bf7934a9729d1578f2b9dca6&reaction=dislike'>👎</a>
##########
superset-websocket/src/config.ts:
##########
@@ -120,6 +124,8 @@ function applyEnvOverrides(config: ConfigType): ConfigType {
(config.pingSocketsIntervalMs = toNumber(val)),
GC_CHANNELS_INTERVAL_MS: val =>
(config.gcChannelsIntervalMs = toNumber(val)),
+ MAX_SOCKET_BUFFER_BYTES: val =>
+ (config.maxSocketBufferBytes = toNumber(val)),
Review Comment:
**Suggestion:** `MAX_SOCKET_BUFFER_BYTES` is parsed with `Number()` and
assigned directly without validation, so a non-numeric env value becomes `NaN`.
In `sendToChannel`, `maxSocketBufferBytes > 0` then evaluates false, silently
disabling backpressure protection instead of enforcing the configured cap.
Validate this env var as a finite non-negative number and reject or log invalid
values. [logic error]
<details>
<summary><b>Severity Level:</b> Major ⚠️</summary>
```mdx
- ⚠️ Backpressure feature silently disabled on misconfigured deployments.
- ⚠️ Slow clients can buffer unbounded data despite configured cap.
- ⚠️ Operators get no logging about invalid MAX_SOCKET_BUFFER_BYTES.
- ⚠️ Websocket sidecar `sendToChannel` ignores intended safety guard.
```
</details>
<details>
<summary><b>Steps of Reproduction ✅ </b></summary>
```mdx
1. Set an invalid non-numeric value for the backpressure cap before starting
the websocket
sidecar, e.g. `MAX_SOCKET_BUFFER_BYTES=abc` in the environment used to
launch the process
(config mapping lives in `superset-websocket/src/config.ts:108-138`).
2. On process startup, `buildConfig()` in
`superset-websocket/src/config.ts:150-152` calls
`defaultConfig()` and then `applyEnvOverrides(config)`, which looks up
`process.env.MAX_SOCKET_BUFFER_BYTES` and invokes the setter
`MAX_SOCKET_BUFFER_BYTES: val
=> (config.maxSocketBufferBytes = toNumber(val))` at
`superset-websocket/src/config.ts:127-128`.
3. Because `toNumber` is just `const toNumber = Number;` at
`superset-websocket/src/config.ts:104`, `Number('abc')` evaluates to `NaN`,
so
`config.maxSocketBufferBytes` (and thus `opts.maxSocketBufferBytes`) becomes
`NaN` with no
validation or logging; `opts` is exported from
`superset-websocket/src/index.ts:80` as
`export const opts = buildConfig();`.
4. When events are processed from Redis in `processStreamResults()` at
`superset-websocket/src/index.ts:28-38`, it calls
`sendToChannel(data.channel_id, { id,
...data });` at line 34; inside `sendToChannel` at
`superset-websocket/src/index.ts:188-223`, the backpressure guard `if
(maxSocketBufferBytes > 0 && socketInstance.ws.bufferedAmount >
maxSocketBufferBytes)` at
lines 201-203 will never execute because `maxSocketBufferBytes` is `NaN` and
`NaN > 0` is
always false, silently disabling backpressure even though
`MAX_SOCKET_BUFFER_BYTES` was
configured.
```
</details>
[Fix in
Cursor](https://app.codeant.ai/fix-in-ide?tool=cursor&prompt_id=80c948b602df4ba0a989b2a29a99d153&service=github&base_url=https%3A%2F%2Fgithub.com&org=apache&repo=apache%2Fsuperset)
| [Fix in VSCode
Claude](https://app.codeant.ai/fix-in-ide?tool=vscode-claude&prompt_id=80c948b602df4ba0a989b2a29a99d153&service=github&base_url=https%3A%2F%2Fgithub.com&org=apache&repo=apache%2Fsuperset)
*(Use Cmd/Ctrl + Click for best experience)*
<details>
<summary><b>Prompt for AI Agent 🤖 </b></summary>
```mdx
This is a comment left during a code review.
**Path:** superset-websocket/src/config.ts
**Line:** 127:128
**Comment:**
*Logic Error: `MAX_SOCKET_BUFFER_BYTES` is parsed with `Number()` and
assigned directly without validation, so a non-numeric env value becomes `NaN`.
In `sendToChannel`, `maxSocketBufferBytes > 0` then evaluates false, silently
disabling backpressure protection instead of enforcing the configured cap.
Validate this env var as a finite non-negative number and reject or log invalid
values.
Validate the correctness of the flagged issue. If correct, How can I resolve
this? If you propose a fix, implement it and please make it concise.
Once fix is implemented, also check other comments on the same PR, and ask
user if the user wants to fix the rest of the comments as well. if said yes,
then fetch all the comments validate the correctness and implement a minimal fix
```
</details>
<a
href='https://app.codeant.ai/feedback?pr_url=https%3A%2F%2Fgithub.com%2Fapache%2Fsuperset%2Fpull%2F40857&comment_hash=b8086b419f1b07801ce51e59ab28f1c3c6b29fc2803595ceefd061feaa974aba&reaction=like'>👍</a>
| <a
href='https://app.codeant.ai/feedback?pr_url=https%3A%2F%2Fgithub.com%2Fapache%2Fsuperset%2Fpull%2F40857&comment_hash=b8086b419f1b07801ce51e59ab28f1c3c6b29fc2803595ceefd061feaa974aba&reaction=dislike'>👎</a>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]