suddjian commented on a change in pull request #11498:
URL: https://github.com/apache/superset/pull/11498#discussion_r607372397



##########
File path: superset-websocket/README.md
##########
@@ -0,0 +1,106 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+# Superset WebSocket Server
+
+A Node.js WebSocket server for sending async event data to the Superset web 
application frontend.
+
+## Requirements
+
+- Node.js 12+ (not tested with older versions)
+- Redis 5+
+
+To use this feature, Superset needs to be configured to enable global async 
queries and to use WebSockets as the transport (see below).
+
+## Architecture
+
+This implementation is based on the architecture defined in 
[SIP-39](https://github.com/apache/superset/issues/9190).
+
+### Streams
+
+Async events are pushed to [Redis 
Streams](https://redis.io/topics/streams-intro) from the [Superset Flask 
app](https://github.com/preset-io/superset/blob/master/superset/utils/async_query_manager.py).
 An event for a particular user is published to two streams: 1) the global 
event stream that includes events for all users, and 2) a 
channel/session-specific stream only for the user. This approach provides a 
good balance of performance (reading off of a single global stream) and fault 
tolerance (dropped connections can "catch up" by reading from the 
channel-specific stream).
+
+Note that Redis Stream [consumer 
groups](https://redis.io/topics/streams-intro#consumer-groups) are not used 
here due to the fact that each group receives a subset of the data for a 
stream, and WebSocket clients have a persistent connection to each app 
instance, requiring access to all data in a stream. Horizontal scaling of the 
WebSocket app requires having multiple WebSocket servers, each with full access 
to the Redis Stream data.
+
+### Connection
+
+When a user's browser initially connects to the WebSocket server, it does so 
over HTTP, which includes the JWT authentication cookie, set by the Flask app, 
in the request. _Note that due to the cookie-based authentication method, the 
WebSocket server must be run on the same host as the web application._ The 
server validates the JWT token by using the shared secret (config: 
`jwtSecret`), and if valid, proceeds to upgrade the connection to a WebSocket. 
The user's session-based "channel" ID is contained in the JWT, and serves as 
the basis for sending received events to the user's connected socket(s).
+
+A user may have multiple WebSocket connections under a single channel 
(session) ID. This would be the case if the user has multiple browser tabs 
open, for example. In this scenario, **all events received for a specific 
channel are sent to all connected sockets**, leaving it to the consumer to 
decide which events are relevant to the current application context.
+
+### Reconnection
+
+It is expected that a user's WebSocket connection may be dropped or 
interrupted due to fluctuating network conditions. The Superset frontend code 
keeps track of the last received async event ID, and attempts to reconnect to 
the WebSocket server with a `last_id` query parameter in the initial HTTP 
request. If a connection includes a valid `last_id` value, events that may have 
already been received and sent unsuccessfully are read from the channel-based 
Redis Stream and re-sent to the new WebSocket connection. The global event 
stream flow then assumes responsibility for sending subsequent events to the 
connected socket(s).
+
+### Connection Management
+
+The server utilizes the standard WebSocket [ping/pong 
functionality](https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API/Writing_WebSocket_servers#pings_and_pongs_the_heartbeat_of_websockets)
 to determine if active WebSocket connections are still alive. Active sockets 
are sent a _ping_ regularly (config: `pingSocketsIntervalMs`), and the internal 
_sockets_ registry is updated with a timestamp when a _pong_ response is 
received. If a _pong_ response has not been received before the timeout period 
(config: `socketResponseTimeoutMs`), the socket is terminated and removed from 
the internal registry.
+
+In addition to periodic socket connection cleanup, the internal _channels_ 
registry is regularly "cleaned" (config: `gcChannelsIntervalMs`) to remove 
stale references and prevent excessive memory consumption over time.
+
+## Install
+
+Install dependencies:
+```
+npm install
+```
+
+## WebSocket Server Configuration
+
+Copy `config.example.json` to `config.json` and adjust the values for your 
environment.
+
+## Superset Configuration
+
+Configure the Superset Flask app to enable global async queries (in 
`superset_config.py`):
+
+Enable the `GLOBAL_ASYNC_QUERIES` feature flag:
+```
+"GLOBAL_ASYNC_QUERIES": True
+```
+
+Configure the following Superset values:
+```
+GLOBAL_ASYNC_QUERIES_TRANSPORT = "ws"
+GLOBAL_ASYNC_QUERIES_WEBSOCKET_URL = "ws://<host>:<port>/"
+```
+
+Note that the WebSocket server must be run on the same hostname (different 
port) for cookies to be shared between the Flask app and the WebSocket server.
+
+The following config values must contain the same values in both the Flask app 
config and `config.json`:
+```
+GLOBAL_ASYNC_QUERIES_REDIS_CONFIG
+GLOBAL_ASYNC_QUERIES_REDIS_STREAM_PREFIX
+GLOBAL_ASYNC_QUERIES_JWT_COOKIE_NAME
+GLOBAL_ASYNC_QUERIES_JWT_SECRET

Review comment:
       idea, non-blocking: If these are not configured correctly, can we have 
the service detect it and provide hints  to help devs/admins get it working?

##########
File path: superset-websocket/config.test.json
##########
@@ -0,0 +1,12 @@
+{
+  "redis": {
+    "port": 6379,
+    "host": "127.0.0.1",
+    "password": "",
+    "db": 10,
+    "ssl": false
+  },
+  "streamPrefix": "test-async-events-",

Review comment:
       change  request: Can we make this more specific? Not clear what a 
`streamPrefix` is used for.

##########
File path: superset-websocket/src/index.ts
##########
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import * as http from 'http';
+import * as net from 'net';
+import WebSocket from 'ws';
+import { v4 as uuidv4 } from 'uuid';
+
+const jwt = require('jsonwebtoken');
+const cookie = require('cookie');
+const Redis = require('ioredis');
+
+export type StreamResult = [recordId: string, record: [label: 'data', data: 
string]];
+
+// sync with superset-frontend/src/components/ErrorMessage/types
+export type ErrorLevel = 'info' | 'warning' | 'error';
+export type SupersetError<ExtraType = Record<string, any> | null> = {
+  error_type: string;
+  extra: ExtraType;
+  level: ErrorLevel;
+  message: string;
+};
+
+type ListenerFunction = (results: StreamResult[]) => void;
+interface EventValue {
+  id: string,
+  channel_id: string,
+  job_id: string,
+  user_id?: string,
+  status: string,
+  errors?: SupersetError[],
+  result_url?: string,
+}
+interface JwtPayload { channel: string }
+interface FetchRangeFromStreamParams { sessionId: string, startId: string, 
endId: string, listener: ListenerFunction }
+export interface SocketInstance { ws: WebSocket, channel: string, pongTs: 
number }
+
+interface ChannelValue {
+  sockets: Array<string>,
+}
+
+const environment = process.env.NODE_ENV;
+export const opts = {
+  port: 8080,
+  redis: {
+    port: 6379,
+    host: "127.0.0.1",
+    password: "",
+    db: 0
+  },
+  streamPrefix: "async-events-",
+  jwtSecret: "",
+  jwtCookieName: "async-token",
+  redisStreamReadCount: 100,
+  redisStreamReadBlockMs: 5000,
+  socketResponseTimeoutMs: 60 * 1000,
+  pingSocketsIntervalMs: 20 * 1000,
+  gcChannelsIntervalMs: 120 * 1000,
+}
+
+const startServer = process.argv[2] === 'start';
+const configFile = environment === 'test' ? '../config.test.json' : 
'../config.json';
+let config = {};
+try {
+  config = require(configFile);
+} catch(err) {
+  console.warn('config.json not found, using defaults');
+}
+
+Object.assign(opts, config);
+
+if(startServer && opts.jwtSecret.length < 32)
+  throw('Please provide a JWT secret at least 32 bytes long')
+
+const redis = new Redis(opts.redis);
+const httpServer = http.createServer();
+export const wss = new WebSocket.Server({ noServer: true, clientTracking: 
false });
+
+const SOCKET_ACTIVE_STATES = [WebSocket.OPEN, WebSocket.CONNECTING];
+const GLOBAL_EVENT_STREAM_NAME = `${opts.streamPrefix}full`;
+const DEFAULT_STREAM_LAST_ID = '$';
+
+export let channels: Record<string, ChannelValue> = {};
+export let sockets: Record<string, SocketInstance> = {};
+let lastFirehoseId: string = DEFAULT_STREAM_LAST_ID;
+
+
+export const setLastFirehoseId = (id: string): void => {
+  lastFirehoseId = id;
+}
+
+export const trackClient = (channel: string, socketInstance: SocketInstance): 
string => {
+  const socketId = uuidv4();
+  sockets[socketId] = socketInstance;
+
+  if(channel in channels) {
+    channels[channel].sockets.push(socketId)
+  } else {
+    channels[channel] = {sockets: [socketId]};
+  }
+
+  return socketId;
+}
+
+export const sendToChannel = (channel: string, value: EventValue): void => {
+  const strData = JSON.stringify(value);
+  if(!channels[channel]) {
+    console.debug(`channel ${channel} is unknown, skipping`);
+    return;
+  }
+  channels[channel].sockets.forEach(socketId => {
+    const socketInstance: SocketInstance = sockets[socketId];
+    if(!socketInstance) return cleanChannel(channel);
+    try {
+      socketInstance.ws.send(strData);
+    } catch(err) {
+      console.debug('Error sending to socket', err);
+      cleanChannel(channel);
+    }
+  });
+}
+
+export const fetchRangeFromStream = async ({sessionId, startId, endId, 
listener}: FetchRangeFromStreamParams) => {
+  const streamName = `${opts.streamPrefix}${sessionId}`;
+  try {
+    const reply = await redis.xrange(streamName, startId, endId);
+    if (!reply || !reply.length) return;
+    listener(reply);
+  } catch(e) {
+    console.error(e);
+  }
+}
+
+export const subscribeToGlobalStream = async (stream: string, listener: 
ListenerFunction) => {
+  /*eslint no-constant-condition: ["error", { "checkLoops": false }]*/
+  while (true) {

Review comment:
       "Block" could be a confusing term here. In typical Node parlance, 
"block" means synchronous code, which this is not. It might be helpful to add a 
comment clarifying what "blocking"  means in this context.

##########
File path: superset-websocket/tsconfig.json
##########
@@ -0,0 +1,12 @@
+{
+    "compilerOptions": {
+      "outDir": "dist",
+      "target": "es6",

Review comment:
       ```suggestion
         "target": "es2019",
   ```
   
   Node 12 supports es2019 syntax

##########
File path: superset-websocket/src/index.ts
##########
@@ -0,0 +1,465 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import * as http from 'http';
+import * as net from 'net';
+import WebSocket from 'ws';
+import { v4 as uuidv4 } from 'uuid';
+
+const winston = require('winston');
+const jwt = require('jsonwebtoken');
+const cookie = require('cookie');
+const Redis = require('ioredis');
+
+export type StreamResult = [
+  recordId: string,
+  record: [label: 'data', data: string],
+];
+
+// sync with superset-frontend/src/components/ErrorMessage/types
+export type ErrorLevel = 'info' | 'warning' | 'error';
+export type SupersetError<ExtraType = Record<string, any> | null> = {
+  error_type: string;
+  extra: ExtraType;
+  level: ErrorLevel;
+  message: string;
+};
+
+type ListenerFunction = (results: StreamResult[]) => void;
+interface EventValue {

Review comment:
       If this server might eventually deliver multiple kinds of events, it 
would be desirable to separate the `id` and `channel_id`, which are used by the 
server, from the schema for the other fields, which are specific to the async 
query event type.
   
   So, an interface like:
   
   ```ts
   interface EventValue<T extends object> {
   id: string;
   channel_id: string;
   data: T; // everything else
   }
   

##########
File path: superset/config.py
##########
@@ -1124,6 +1125,7 @@ class CeleryConfig:  # pylint: 
disable=too-few-public-methods
 GLOBAL_ASYNC_QUERIES_JWT_SECRET = "test-secret-change-me"
 GLOBAL_ASYNC_QUERIES_TRANSPORT = "polling"
 GLOBAL_ASYNC_QUERIES_POLLING_DELAY = 500
+GLOBAL_ASYNC_QUERIES_WEBSOCKET_URL = "ws://127.0.0.1:8080/"

Review comment:
       Is there potential for the websocket server to be used for purposes 
other than async queries in the future? If so, this config should be given a 
more general name.

##########
File path: superset-websocket/src/index.ts
##########
@@ -0,0 +1,465 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import * as http from 'http';
+import * as net from 'net';
+import WebSocket from 'ws';
+import { v4 as uuidv4 } from 'uuid';
+
+const winston = require('winston');
+const jwt = require('jsonwebtoken');
+const cookie = require('cookie');
+const Redis = require('ioredis');
+
+export type StreamResult = [
+  recordId: string,
+  record: [label: 'data', data: string],
+];
+
+// sync with superset-frontend/src/components/ErrorMessage/types
+export type ErrorLevel = 'info' | 'warning' | 'error';
+export type SupersetError<ExtraType = Record<string, any> | null> = {
+  error_type: string;
+  extra: ExtraType;
+  level: ErrorLevel;
+  message: string;
+};
+
+type ListenerFunction = (results: StreamResult[]) => void;
+interface EventValue {
+  id: string;
+  channel_id: string;
+  job_id: string;
+  user_id?: string;
+  status: string;
+  errors?: SupersetError[];
+  result_url?: string;
+}
+interface JwtPayload {
+  channel: string;
+}
+interface FetchRangeFromStreamParams {
+  sessionId: string;
+  startId: string;
+  endId: string;
+  listener: ListenerFunction;
+}
+export interface SocketInstance {
+  ws: WebSocket;
+  channel: string;
+  pongTs: number;
+}
+interface RedisConfig {
+  port: number;
+  host: string;
+  password?: string | null;
+  db: number;
+  ssl: boolean;
+}
+
+interface ChannelValue {
+  sockets: Array<string>;
+}
+
+const environment = process.env.NODE_ENV;
+
+// default options
+export const opts = {
+  port: 8080,
+  logLevel: 'info',
+  logToFile: false,
+  logFilename: 'app.log',
+  redis: {
+    port: 6379,
+    host: '127.0.0.1',
+    password: '',
+    db: 0,
+    ssl: false,
+  },
+  streamPrefix: 'async-events-',
+  jwtSecret: '',
+  jwtCookieName: 'async-token',
+  redisStreamReadCount: 100,
+  redisStreamReadBlockMs: 5000,
+  socketResponseTimeoutMs: 60 * 1000,
+  pingSocketsIntervalMs: 20 * 1000,
+  gcChannelsIntervalMs: 120 * 1000,
+};
+
+const startServer = process.argv[2] === 'start';
+const configFile =
+  environment === 'test' ? '../config.test.json' : '../config.json';
+let config = {};
+try {
+  config = require(configFile);
+} catch (err) {
+  console.error('config.json not found, using defaults');
+}
+// apply config overrides
+Object.assign(opts, config);
+
+// init logger
+const logTransports = [
+  new winston.transports.Console({ handleExceptions: true }),
+];
+if (opts.logToFile && opts.logFilename) {
+  logTransports.push(
+    new winston.transports.File({
+      filename: opts.logFilename,
+      handleExceptions: true,
+    }),
+  );
+}
+const logger = winston.createLogger({
+  level: opts.logLevel,
+  transports: logTransports,
+});
+
+// enforce JWT secret length
+if (startServer && opts.jwtSecret.length < 32)
+  throw new Error('Please provide a JWT secret at least 32 bytes long');
+
+export const redisUrlFromConfig = (redisConfig: RedisConfig): string => {
+  let url = redisConfig.ssl ? 'rediss://' : 'redis://';
+  if (redisConfig.password) url += `:${redisConfig.password}@`;
+  url += `${redisConfig.host}:${redisConfig.port}/${redisConfig.db}`;
+  return url;
+};
+
+// initialize servers
+const redis = new Redis(redisUrlFromConfig(opts.redis));
+const httpServer = http.createServer();
+export const wss = new WebSocket.Server({
+  noServer: true,
+  clientTracking: false,
+});
+
+const SOCKET_ACTIVE_STATES = [WebSocket.OPEN, WebSocket.CONNECTING];
+const GLOBAL_EVENT_STREAM_NAME = `${opts.streamPrefix}full`;
+const DEFAULT_STREAM_LAST_ID = '$';
+
+// initialize internal registries
+export let channels: Record<string, ChannelValue> = {};
+export let sockets: Record<string, SocketInstance> = {};
+let lastFirehoseId: string = DEFAULT_STREAM_LAST_ID;
+
+export const setLastFirehoseId = (id: string): void => {
+  lastFirehoseId = id;
+};
+
+/**
+ * Adds the passed channel and socket instance to the internal registries.
+ */
+export const trackClient = (
+  channel: string,
+  socketInstance: SocketInstance,
+): string => {
+  const socketId = uuidv4();
+  sockets[socketId] = socketInstance;
+
+  if (channel in channels) {
+    channels[channel].sockets.push(socketId);
+  } else {
+    channels[channel] = { sockets: [socketId] };
+  }
+
+  return socketId;
+};
+
+/**
+ * Sends a single async event payload to a single channel.
+ * A channel may have multiple connected sockets, this emits
+ * the event to all connected sockets within a channel.
+ */
+export const sendToChannel = (channel: string, value: EventValue): void => {
+  const strData = JSON.stringify(value);
+  if (!channels[channel]) {
+    logger.debug(`channel ${channel} is unknown, skipping`);
+    return;
+  }
+  channels[channel].sockets.forEach(socketId => {
+    const socketInstance: SocketInstance = sockets[socketId];
+    if (!socketInstance) return cleanChannel(channel);
+    try {
+      socketInstance.ws.send(strData);
+    } catch (err) {
+      logger.debug(`Error sending to socket: ${err}`);
+      // check that the connection is still active
+      cleanChannel(channel);
+    }
+  });
+};
+
+/**
+ * Reads a range of events from a channel-specific Redis event stream.
+ * Invoked in the client re-connection flow.
+ */
+export const fetchRangeFromStream = async ({
+  sessionId,
+  startId,
+  endId,
+  listener,
+}: FetchRangeFromStreamParams) => {
+  const streamName = `${opts.streamPrefix}${sessionId}`;
+  try {
+    const reply = await redis.xrange(streamName, startId, endId);
+    if (!reply || !reply.length) return;
+    listener(reply);
+  } catch (e) {
+    logger.error(e);
+  }
+};
+
+/**
+ * Reads from the global Redis event stream continuously.
+ * Utilizes a blocking connection to Redis to wait for data to
+ * be returned from the stream.
+ */
+export const subscribeToGlobalStream = async (
+  stream: string,
+  listener: ListenerFunction,
+) => {
+  /*eslint no-constant-condition: ["error", { "checkLoops": false }]*/
+  while (true) {
+    try {
+      const reply = await redis.xread(
+        'BLOCK',
+        opts.redisStreamReadBlockMs,
+        'COUNT',
+        opts.redisStreamReadCount,
+        'STREAMS',
+        stream,
+        lastFirehoseId,
+      );
+      if (!reply) {
+        continue;
+      }
+      const results = reply[0][1];
+      const { length } = results;
+      if (!results.length) {
+        continue;
+      }
+      listener(results);
+      setLastFirehoseId(results[length - 1][0]);
+    } catch (e) {
+      logger.error(e);
+      continue;
+    }
+  }
+};
+
+/**
+ * Callback function to process events received from a Redis Stream
+ */
+export const processStreamResults = (results: StreamResult[]): void => {
+  logger.debug(`events received: ${results}`);
+  results.forEach(item => {
+    try {
+      const id = item[0];
+      const data = JSON.parse(item[1][1]);
+      sendToChannel(data.channel_id, { id, ...data });
+    } catch (err) {
+      logger.error(err);
+    }
+  });
+};
+
+/**
+ * Verify and parse a JWT cookie from an HTTP request.
+ * Returns the JWT payload or throws an error on invalid token.
+ */
+const getJwtPayload = (request: http.IncomingMessage): JwtPayload => {
+  const cookies = cookie.parse(request.headers.cookie);
+  const token = cookies[opts.jwtCookieName];
+
+  if (!token) throw new Error('JWT not present');
+  return jwt.verify(token, opts.jwtSecret);
+};
+
+/**
+ * Extracts the `last_id` query param value from an HTTP request
+ */
+const getLastId = (request: http.IncomingMessage): string | null => {
+  const url = new URL(String(request.url), 'http://0.0.0.0');
+  const queryParams = url.searchParams;
+  return queryParams.get('last_id');
+};
+
+/**
+ * Increments a Redis Stream ID
+ */
+export const incrementId = (id: string): string => {
+  // redis stream IDs are in this format: '1607477697866-0'
+  const parts = id.split('-');
+  if (parts.length < 2) return id;
+  return parts[0] + '-' + (Number(parts[1]) + 1);
+};
+
+/**
+ * WebSocket `connection` event handler, called via wss
+ */
+export const wsConnection = (ws: WebSocket, request: http.IncomingMessage) => {
+  const jwtPayload: JwtPayload = getJwtPayload(request);
+  const channel: string = jwtPayload.channel;
+  const socketInstance: SocketInstance = { ws, channel, pongTs: Date.now() };
+
+  // add this ws instance to the internal registry
+  const socketId = trackClient(channel, socketInstance);
+  logger.debug(`socket ${socketId} connected on channel ${channel}`);
+
+  // reconnection logic
+  const lastId = getLastId(request);
+  if (lastId) {
+    // fetch range of events from lastId to most recent event received on
+    // via global event stream
+    const endId =
+      lastFirehoseId === DEFAULT_STREAM_LAST_ID ? '+' : lastFirehoseId;
+    fetchRangeFromStream({
+      sessionId: channel,
+      startId: incrementId(lastId), // inclusive
+      endId, // inclusive
+      listener: processStreamResults,
+    });
+  }
+
+  // init event handler for `pong` events (connection management)
+  ws.on('pong', function pong(data: Buffer) {
+    const socketId = data.toString();
+    const socketInstance = sockets[socketId];
+    if (!socketInstance) {
+      logger.warn(`pong received for nonexistent socket ${socketId}`);
+    } else {
+      socketInstance.pongTs = Date.now();
+    }
+  });
+};
+
+/**
+ * HTTP `upgrade` event handler, called via httpServer
+ */
+export const httpUpgrade = (
+  request: http.IncomingMessage,
+  socket: net.Socket,
+  head: Buffer,
+) => {
+  try {
+    const jwtPayload: JwtPayload = getJwtPayload(request);
+    if (!jwtPayload.channel) throw new Error('Channel ID not present');
+  } catch (err) {
+    // JWT invalid, do not establish a WebSocket connection
+    logger.error(err);
+    socket.destroy();
+    return;
+  }
+
+  // upgrade the HTTP request into a WebSocket connection
+  wss.handleUpgrade(
+    request,
+    socket,
+    head,
+    function cb(ws: WebSocket, request: http.IncomingMessage) {
+      wss.emit('connection', ws, request);
+    },
+  );
+};
+
+// Connection cleanup and garbage collection
+
+/**
+ * Iterate over all tracked sockets, terminating and removing references to
+ * connections that have not responded with a _pong_ within the timeout window.
+ * Sends a _ping_ to all active connections.
+ */
+export const checkSockets = () => {
+  logger.debug(`channel count: ${Object.keys(channels).length}`);
+  logger.debug(`socket count: ${Object.keys(sockets).length}`);
+  for (const socketId in sockets) {
+    const socketInstance = sockets[socketId];
+    const timeout = Date.now() - socketInstance.pongTs;
+    let isActive = true;
+
+    if (timeout >= opts.socketResponseTimeoutMs) {
+      logger.debug(
+        `terminating unresponsive socket: ${socketId}, channel: 
${socketInstance.channel}`,
+      );
+      socketInstance.ws.terminate();
+      isActive = false;
+    } else if (!SOCKET_ACTIVE_STATES.includes(socketInstance.ws.readyState)) {
+      isActive = false;
+    }
+
+    if (isActive) {
+      socketInstance.ws.ping(socketId);
+    } else {
+      delete sockets[socketId];
+      logger.debug(`forgetting socket ${socketId}`);
+    }
+  }
+};
+
+/**
+ * Iterate over all sockets within a channel, removing references to
+ * inactive connections, ultimately removing the channel from the
+ * _channels_ registry.

Review comment:
       ```suggestion
    * _channels_ registry if no active connections remain.
   ```

##########
File path: superset-websocket/package.json
##########
@@ -0,0 +1,41 @@
+{
+  "name": "superset-websocket",
+  "version": "0.0.1",
+  "description": "Websocket sidecar application for Superset",
+  "main": "index.js",
+  "scripts": {
+    "start": "node dist/index.js start",
+    "test": "NODE_ENV=test jest -i spec",
+    "type": "tsc --noEmit",
+    "lint": "eslint . --ext .js,.jsx,.ts,.tsx && npm run type",
+    "dev-server": "ts-node src/index.ts start",
+    "build": "tsc",
+    "prettier-check": "prettier --check .",

Review comment:
       change request: Could this be named `format-check` instead?

##########
File path: superset-websocket/src/index.ts
##########
@@ -0,0 +1,465 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import * as http from 'http';
+import * as net from 'net';
+import WebSocket from 'ws';
+import { v4 as uuidv4 } from 'uuid';
+
+const winston = require('winston');
+const jwt = require('jsonwebtoken');
+const cookie = require('cookie');
+const Redis = require('ioredis');
+
+export type StreamResult = [
+  recordId: string,
+  record: [label: 'data', data: string],
+];
+
+// sync with superset-frontend/src/components/ErrorMessage/types
+export type ErrorLevel = 'info' | 'warning' | 'error';
+export type SupersetError<ExtraType = Record<string, any> | null> = {
+  error_type: string;
+  extra: ExtraType;
+  level: ErrorLevel;
+  message: string;
+};
+
+type ListenerFunction = (results: StreamResult[]) => void;
+interface EventValue {
+  id: string;
+  channel_id: string;
+  job_id: string;
+  user_id?: string;
+  status: string;
+  errors?: SupersetError[];
+  result_url?: string;
+}
+interface JwtPayload {
+  channel: string;
+}
+interface FetchRangeFromStreamParams {
+  sessionId: string;
+  startId: string;
+  endId: string;
+  listener: ListenerFunction;
+}
+export interface SocketInstance {
+  ws: WebSocket;
+  channel: string;
+  pongTs: number;
+}
+interface RedisConfig {
+  port: number;
+  host: string;
+  password?: string | null;
+  db: number;
+  ssl: boolean;
+}
+
+interface ChannelValue {
+  sockets: Array<string>;
+}
+
+const environment = process.env.NODE_ENV;
+
+// default options
+export const opts = {
+  port: 8080,
+  logLevel: 'info',
+  logToFile: false,
+  logFilename: 'app.log',
+  redis: {
+    port: 6379,
+    host: '127.0.0.1',
+    password: '',
+    db: 0,
+    ssl: false,
+  },
+  streamPrefix: 'async-events-',
+  jwtSecret: '',
+  jwtCookieName: 'async-token',
+  redisStreamReadCount: 100,
+  redisStreamReadBlockMs: 5000,
+  socketResponseTimeoutMs: 60 * 1000,
+  pingSocketsIntervalMs: 20 * 1000,
+  gcChannelsIntervalMs: 120 * 1000,
+};
+
+const startServer = process.argv[2] === 'start';
+const configFile =
+  environment === 'test' ? '../config.test.json' : '../config.json';
+let config = {};
+try {
+  config = require(configFile);
+} catch (err) {
+  console.error('config.json not found, using defaults');
+}
+// apply config overrides
+Object.assign(opts, config);
+
+// init logger
+const logTransports = [
+  new winston.transports.Console({ handleExceptions: true }),
+];
+if (opts.logToFile && opts.logFilename) {
+  logTransports.push(
+    new winston.transports.File({
+      filename: opts.logFilename,
+      handleExceptions: true,
+    }),
+  );
+}
+const logger = winston.createLogger({
+  level: opts.logLevel,
+  transports: logTransports,
+});
+
+// enforce JWT secret length
+if (startServer && opts.jwtSecret.length < 32)
+  throw new Error('Please provide a JWT secret at least 32 bytes long');
+
+export const redisUrlFromConfig = (redisConfig: RedisConfig): string => {
+  let url = redisConfig.ssl ? 'rediss://' : 'redis://';
+  if (redisConfig.password) url += `:${redisConfig.password}@`;
+  url += `${redisConfig.host}:${redisConfig.port}/${redisConfig.db}`;
+  return url;
+};
+
+// initialize servers
+const redis = new Redis(redisUrlFromConfig(opts.redis));
+const httpServer = http.createServer();
+export const wss = new WebSocket.Server({
+  noServer: true,
+  clientTracking: false,
+});
+
+const SOCKET_ACTIVE_STATES = [WebSocket.OPEN, WebSocket.CONNECTING];
+const GLOBAL_EVENT_STREAM_NAME = `${opts.streamPrefix}full`;
+const DEFAULT_STREAM_LAST_ID = '$';
+
+// initialize internal registries
+export let channels: Record<string, ChannelValue> = {};
+export let sockets: Record<string, SocketInstance> = {};
+let lastFirehoseId: string = DEFAULT_STREAM_LAST_ID;
+
+export const setLastFirehoseId = (id: string): void => {
+  lastFirehoseId = id;
+};
+
+/**
+ * Adds the passed channel and socket instance to the internal registries.
+ */
+export const trackClient = (
+  channel: string,
+  socketInstance: SocketInstance,
+): string => {
+  const socketId = uuidv4();
+  sockets[socketId] = socketInstance;
+
+  if (channel in channels) {
+    channels[channel].sockets.push(socketId);
+  } else {
+    channels[channel] = { sockets: [socketId] };
+  }
+
+  return socketId;
+};
+
+/**
+ * Sends a single async event payload to a single channel.
+ * A channel may have multiple connected sockets, this emits
+ * the event to all connected sockets within a channel.
+ */
+export const sendToChannel = (channel: string, value: EventValue): void => {
+  const strData = JSON.stringify(value);
+  if (!channels[channel]) {
+    logger.debug(`channel ${channel} is unknown, skipping`);
+    return;
+  }
+  channels[channel].sockets.forEach(socketId => {
+    const socketInstance: SocketInstance = sockets[socketId];
+    if (!socketInstance) return cleanChannel(channel);
+    try {
+      socketInstance.ws.send(strData);
+    } catch (err) {
+      logger.debug(`Error sending to socket: ${err}`);
+      // check that the connection is still active
+      cleanChannel(channel);
+    }
+  });
+};
+
+/**
+ * Reads a range of events from a channel-specific Redis event stream.
+ * Invoked in the client re-connection flow.
+ */
+export const fetchRangeFromStream = async ({

Review comment:
       Should this return the reply in a promise instead of using a listener?

##########
File path: superset/utils/async_query_manager.py
##########
@@ -111,13 +111,14 @@ def init_app(self, app: Flask) -> None:
         def validate_session(  # pylint: disable=unused-variable
             response: Response,
         ) -> Response:
-            reset_token = False
             user_id = session["user_id"] if "user_id" in session else None
 
-            if "async_channel_id" not in session or "async_user_id" not in 
session:
-                reset_token = True
-            elif user_id != session["async_user_id"]:
-                reset_token = True
+            reset_token = (

Review comment:
       Small change but I like how much easier to read this is 🎉 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to