Copilot commented on code in PR #40857:
URL: https://github.com/apache/superset/pull/40857#discussion_r3375211649


##########
superset-websocket/src/config.ts:
##########
@@ -120,6 +124,8 @@ function applyEnvOverrides(config: ConfigType): ConfigType {
       (config.pingSocketsIntervalMs = toNumber(val)),
     GC_CHANNELS_INTERVAL_MS: val =>
       (config.gcChannelsIntervalMs = toNumber(val)),
+    MAX_SOCKET_BUFFER_BYTES: val =>
+      (config.maxSocketBufferBytes = toNumber(val)),

Review Comment:
   `MAX_SOCKET_BUFFER_BYTES` is parsed with `toNumber(val)` but the result is 
not validated. If the env var is unset/malformed (NaN) or negative, the cap 
will silently behave as disabled due to the `> 0` check. Recommend clamping to 
`>= 0` and handling non-finite values explicitly (e.g., ignore override and 
keep default, and/or log a warning) to avoid surprising runtime behavior from 
misconfiguration.



##########
superset-websocket/src/index.ts:
##########
@@ -194,6 +194,24 @@ export const sendToChannel = (channel: string, value: 
EventValue): void => {
   channels[channel].sockets.forEach(socketId => {
     const socketInstance: SocketInstance = sockets[socketId];
     if (!socketInstance) return cleanChannel(channel);
+    // Backpressure: if a slow or stalled client has let its outbound buffer
+    // grow past the configured cap, terminate it rather than buffering
+    // unbounded data in server memory. Opt-in: a cap of 0 disables the check.
+    const { maxSocketBufferBytes } = opts;
+    if (
+      maxSocketBufferBytes > 0 &&
+      socketInstance.ws.bufferedAmount > maxSocketBufferBytes
+    ) {
+      statsd.increment('ws_client_backpressure_disconnect');
+      logger.warn(
+        `Terminating socket on channel ${channel}: send buffer ` +
+          `(${socketInstance.ws.bufferedAmount} bytes) exceeded the ` +
+          `configured limit (${maxSocketBufferBytes} bytes)`,
+      );
+      socketInstance.ws.terminate();
+      cleanChannel(channel);
+      return;
+    }

Review Comment:
   In this termination path, the channel is cleaned but the `sockets` registry 
entry for the current `socketId` is not removed here. That can leave stale 
entries in the server’s socket registry, which can accumulate over time with 
repeated backpressure disconnects. Consider explicitly deleting the socket’s 
entry from the `sockets` map (or centralizing this into a single 
‘disconnect/cleanup’ helper used by all close/error/terminate paths) so both 
channel membership and the global socket registry stay in sync.



##########
superset-websocket/spec/index.test.ts:
##########
@@ -269,6 +269,80 @@ describe('server', () => {
     });
   });
 
+  describe('backpressure', () => {
+    const fakeEvent = {
+      id: '1615426152415-0',
+      channel_id: channelId,
+      job_id: 'c9b99965-8f1e-4ce5-aa43-d6fc94d6a510',
+      status: 'done',
+    };
+
+    afterEach(() => {
+      server.opts.maxSocketBufferBytes = 0;
+    });
+
+    test('does not terminate when cap disabled (0)', () => {
+      server.opts.maxSocketBufferBytes = 0;
+      const ws = new wsMock('localhost');
+      // simulate a large outbound buffer
+      (ws as unknown as { bufferedAmount: number }).bufferedAmount = 
10_000_000;
+      const terminateMock = jest.spyOn(ws, 'terminate');
+      const sendMock = jest.spyOn(ws, 'send');

Review Comment:
   These tests create spies on `ws.terminate` / `ws.send` (and 
`server.cleanChannel` in one test), but only `cleanChannelMock` is restored. If 
the test runner isn’t configured with automatic mock restoration, this can leak 
spies across tests and cause order-dependent failures. Consider restoring all 
spies (or adding an `afterEach(() => jest.restoreAllMocks())` in this 
`describe('backpressure')` block) to keep test isolation reliable.



##########
superset-websocket/spec/index.test.ts:
##########
@@ -269,6 +269,80 @@ describe('server', () => {
     });
   });
 
+  describe('backpressure', () => {
+    const fakeEvent = {
+      id: '1615426152415-0',
+      channel_id: channelId,
+      job_id: 'c9b99965-8f1e-4ce5-aa43-d6fc94d6a510',
+      status: 'done',
+    };
+
+    afterEach(() => {
+      server.opts.maxSocketBufferBytes = 0;
+    });
+
+    test('does not terminate when cap disabled (0)', () => {
+      server.opts.maxSocketBufferBytes = 0;
+      const ws = new wsMock('localhost');
+      // simulate a large outbound buffer
+      (ws as unknown as { bufferedAmount: number }).bufferedAmount = 
10_000_000;
+      const terminateMock = jest.spyOn(ws, 'terminate');
+      const sendMock = jest.spyOn(ws, 'send');
+      server.trackClient(channelId, {
+        ws,
+        channel: channelId,
+        pongTs: Date.now(),
+      });
+
+      server.sendToChannel(channelId, fakeEvent);
+
+      expect(terminateMock).not.toHaveBeenCalled();
+      expect(sendMock).toHaveBeenCalled();
+    });
+
+    test('terminates a slow client whose buffer exceeds the cap', () => {
+      server.opts.maxSocketBufferBytes = 1024;
+      const ws = new wsMock('localhost');
+      (ws as unknown as { bufferedAmount: number }).bufferedAmount = 2048;
+      const terminateMock = jest.spyOn(ws, 'terminate');
+      const sendMock = jest.spyOn(ws, 'send');
+      const cleanChannelMock = jest.spyOn(server, 'cleanChannel');

Review Comment:
   These tests create spies on `ws.terminate` / `ws.send` (and 
`server.cleanChannel` in one test), but only `cleanChannelMock` is restored. If 
the test runner isn’t configured with automatic mock restoration, this can leak 
spies across tests and cause order-dependent failures. Consider restoring all 
spies (or adding an `afterEach(() => jest.restoreAllMocks())` in this 
`describe('backpressure')` block) to keep test isolation reliable.



##########
superset-websocket/spec/index.test.ts:
##########
@@ -269,6 +269,80 @@ describe('server', () => {
     });
   });
 
+  describe('backpressure', () => {
+    const fakeEvent = {
+      id: '1615426152415-0',
+      channel_id: channelId,
+      job_id: 'c9b99965-8f1e-4ce5-aa43-d6fc94d6a510',
+      status: 'done',
+    };
+
+    afterEach(() => {
+      server.opts.maxSocketBufferBytes = 0;
+    });
+
+    test('does not terminate when cap disabled (0)', () => {
+      server.opts.maxSocketBufferBytes = 0;
+      const ws = new wsMock('localhost');
+      // simulate a large outbound buffer
+      (ws as unknown as { bufferedAmount: number }).bufferedAmount = 
10_000_000;
+      const terminateMock = jest.spyOn(ws, 'terminate');
+      const sendMock = jest.spyOn(ws, 'send');
+      server.trackClient(channelId, {
+        ws,
+        channel: channelId,
+        pongTs: Date.now(),
+      });
+
+      server.sendToChannel(channelId, fakeEvent);
+
+      expect(terminateMock).not.toHaveBeenCalled();
+      expect(sendMock).toHaveBeenCalled();
+    });
+
+    test('terminates a slow client whose buffer exceeds the cap', () => {
+      server.opts.maxSocketBufferBytes = 1024;
+      const ws = new wsMock('localhost');
+      (ws as unknown as { bufferedAmount: number }).bufferedAmount = 2048;
+      const terminateMock = jest.spyOn(ws, 'terminate');
+      const sendMock = jest.spyOn(ws, 'send');
+      const cleanChannelMock = jest.spyOn(server, 'cleanChannel');
+      server.trackClient(channelId, {
+        ws,
+        channel: channelId,
+        pongTs: Date.now(),
+      });
+
+      server.sendToChannel(channelId, fakeEvent);
+
+      expect(terminateMock).toHaveBeenCalled();
+      expect(sendMock).not.toHaveBeenCalled();
+      expect(statsdIncrementMock).toHaveBeenCalledWith(
+        'ws_client_backpressure_disconnect',
+      );
+      expect(cleanChannelMock).toHaveBeenCalledWith(channelId);
+      cleanChannelMock.mockRestore();

Review Comment:
   These tests create spies on `ws.terminate` / `ws.send` (and 
`server.cleanChannel` in one test), but only `cleanChannelMock` is restored. If 
the test runner isn’t configured with automatic mock restoration, this can leak 
spies across tests and cause order-dependent failures. Consider restoring all 
spies (or adding an `afterEach(() => jest.restoreAllMocks())` in this 
`describe('backpressure')` block) to keep test isolation reliable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to