rusackas commented on code in PR #40857:
URL: https://github.com/apache/superset/pull/40857#discussion_r3375287405


##########
superset-websocket/src/config.ts:
##########
@@ -120,6 +124,8 @@ function applyEnvOverrides(config: ConfigType): ConfigType {
       (config.pingSocketsIntervalMs = toNumber(val)),
     GC_CHANNELS_INTERVAL_MS: val =>
       (config.gcChannelsIntervalMs = toNumber(val)),
+    MAX_SOCKET_BUFFER_BYTES: val =>
+      (config.maxSocketBufferBytes = toNumber(val)),

Review Comment:
   Good catch. Fixed in ba641de: `MAX_SOCKET_BUFFER_BYTES` now goes through a 
`toNonNegativeNumber` helper that validates the value is a finite number >= 0, 
falling back to the default and logging a warning on malformed input, so a bad 
env var can no longer silently disable the backpressure guard.



##########
superset-websocket/src/index.ts:
##########
@@ -194,6 +194,24 @@ export const sendToChannel = (channel: string, value: 
EventValue): void => {
   channels[channel].sockets.forEach(socketId => {
     const socketInstance: SocketInstance = sockets[socketId];
     if (!socketInstance) return cleanChannel(channel);
+    // Backpressure: if a slow or stalled client has let its outbound buffer
+    // grow past the configured cap, terminate it rather than buffering
+    // unbounded data in server memory. Opt-in: a cap of 0 disables the check.
+    const { maxSocketBufferBytes } = opts;
+    if (
+      maxSocketBufferBytes > 0 &&
+      socketInstance.ws.bufferedAmount > maxSocketBufferBytes
+    ) {
+      statsd.increment('ws_client_backpressure_disconnect');
+      logger.warn(
+        `Terminating socket on channel ${channel}: send buffer ` +
+          `(${socketInstance.ws.bufferedAmount} bytes) exceeded the ` +
+          `configured limit (${maxSocketBufferBytes} bytes)`,
+      );
+      socketInstance.ws.terminate();
+      cleanChannel(channel);
+      return;

Review Comment:
   Agreed. Fixed in ba641de: the backpressure branch now `delete 
sockets[socketId]` immediately after terminating, so terminated sockets are 
dropped from the global registry right away instead of lingering until the next 
`checkSockets` sweep.



##########
superset-websocket/spec/index.test.ts:
##########
@@ -269,6 +269,80 @@ describe('server', () => {
     });
   });
 
+  describe('backpressure', () => {
+    const fakeEvent = {
+      id: '1615426152415-0',
+      channel_id: channelId,
+      job_id: 'c9b99965-8f1e-4ce5-aa43-d6fc94d6a510',
+      status: 'done',
+    };
+
+    afterEach(() => {
+      server.opts.maxSocketBufferBytes = 0;
+    });
+
+    test('does not terminate when cap disabled (0)', () => {
+      server.opts.maxSocketBufferBytes = 0;
+      const ws = new wsMock('localhost');
+      // simulate a large outbound buffer
+      (ws as unknown as { bufferedAmount: number }).bufferedAmount = 
10_000_000;
+      const terminateMock = jest.spyOn(ws, 'terminate');
+      const sendMock = jest.spyOn(ws, 'send');

Review Comment:
   Good point on test isolation. Fixed in ba641de by adding 
`jest.restoreAllMocks()` to the backpressure `afterEach`, which also let me 
drop the redundant manual `cleanChannelMock.mockRestore()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to