benjreinhart commented on a change in pull request #11498:
URL: https://github.com/apache/superset/pull/11498#discussion_r602645303



##########
File path: superset-websocket/package.json
##########
@@ -0,0 +1,36 @@
+{
+  "name": "superset-websocket",
+  "version": "0.0.1",
+  "description": "Websocket sidecar application for Superset",
+  "main": "index.js",
+  "scripts": {
+    "start": "node dist/index.js start",
+    "test": "NODE_ENV=test jest -i spec",
+    "type": "tsc --noEmit",
+    "lint": "eslint . --ext .js,.jsx,.ts,.tsx && npm run type",
+    "dev-server": "ts-node src/index.ts start",
+    "build": "tsc"
+  },
+  "license": "Apache-2.0",
+  "dependencies": {
+    "cookie": "^0.4.1",
+    "ioredis": "^4.16.1",
+    "jsonwebtoken": "^8.5.1",
+    "uuid": "^8.3.2",
+    "ws": "^7.4.2"
+  },
+  "devDependencies": {

Review comment:
       This is not important now, but I think it would be good to eventually 
add `prettier` to this application.

##########
File path: superset-websocket/src/index.ts
##########
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import * as http from 'http';
+import * as net from 'net';
+import WebSocket from 'ws';
+import { v4 as uuidv4 } from 'uuid';
+
+const jwt = require('jsonwebtoken');
+const cookie = require('cookie');
+const Redis = require('ioredis');
+
+export type StreamResult = [recordId: string, record: [label: 'data', data: 
string]];
+
+// sync with superset-frontend/src/components/ErrorMessage/types
+export type ErrorLevel = 'info' | 'warning' | 'error';
+export type SupersetError<ExtraType = Record<string, any> | null> = {
+  error_type: string;
+  extra: ExtraType;
+  level: ErrorLevel;
+  message: string;
+};
+
+type ListenerFunction = (results: StreamResult[]) => void;
+interface EventValue {
+  id: string,
+  channel_id: string,
+  job_id: string,
+  user_id?: string,
+  status: string,
+  errors?: SupersetError[],
+  result_url?: string,
+}
+interface JwtPayload { channel: string }
+interface FetchRangeFromStreamParams { sessionId: string, startId: string, 
endId: string, listener: ListenerFunction }
+export interface SocketInstance { ws: WebSocket, channel: string, pongTs: 
number }
+
+interface ChannelValue {
+  sockets: Array<string>,
+}
+
+const environment = process.env.NODE_ENV;
+export const opts = {
+  port: 8080,
+  redis: {
+    port: 6379,
+    host: "127.0.0.1",
+    password: "",
+    db: 0
+  },
+  streamPrefix: "async-events-",
+  jwtSecret: "",
+  jwtCookieName: "async-token",
+  redisStreamReadCount: 100,
+  redisStreamReadBlockMs: 5000,
+  socketResponseTimeoutMs: 60 * 1000,
+  pingSocketsIntervalMs: 20 * 1000,
+  gcChannelsIntervalMs: 120 * 1000,
+}
+
+const startServer = process.argv[2] === 'start';
+const configFile = environment === 'test' ? '../config.test.json' : 
'../config.json';
+let config = {};
+try {
+  config = require(configFile);
+} catch(err) {
+  console.warn('config.json not found, using defaults');
+}
+
+Object.assign(opts, config);
+
+if(startServer && opts.jwtSecret.length < 32)
+  throw('Please provide a JWT secret at least 32 bytes long')
+
+const redis = new Redis(opts.redis);
+const httpServer = http.createServer();
+export const wss = new WebSocket.Server({ noServer: true, clientTracking: 
false });
+
+const SOCKET_ACTIVE_STATES = [WebSocket.OPEN, WebSocket.CONNECTING];
+const GLOBAL_EVENT_STREAM_NAME = `${opts.streamPrefix}full`;
+const DEFAULT_STREAM_LAST_ID = '$';
+
+export let channels: Record<string, ChannelValue> = {};
+export let sockets: Record<string, SocketInstance> = {};
+let lastFirehoseId: string = DEFAULT_STREAM_LAST_ID;
+
+
+export const setLastFirehoseId = (id: string): void => {
+  lastFirehoseId = id;
+}
+
+export const trackClient = (channel: string, socketInstance: SocketInstance): 
string => {
+  const socketId = uuidv4();
+  sockets[socketId] = socketInstance;
+
+  if(channel in channels) {
+    channels[channel].sockets.push(socketId)
+  } else {
+    channels[channel] = {sockets: [socketId]};
+  }
+
+  return socketId;
+}
+
+export const sendToChannel = (channel: string, value: EventValue): void => {
+  const strData = JSON.stringify(value);
+  if(!channels[channel]) {
+    console.debug(`channel ${channel} is unknown, skipping`);
+    return;
+  }
+  channels[channel].sockets.forEach(socketId => {
+    const socketInstance: SocketInstance = sockets[socketId];
+    if(!socketInstance) return cleanChannel(channel);
+    try {
+      socketInstance.ws.send(strData);
+    } catch(err) {
+      console.debug('Error sending to socket', err);

Review comment:
       Nit: Is this error indicative of a real problem? If so, should this be 
`console.error`?

##########
File path: superset-websocket/README.md
##########
@@ -0,0 +1,80 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+# Superset WebSocket Server
+
+A Node.js WebSocket server for sending async event data to the Superset web 
application frontend, based on 
[SIP-39](https://github.com/apache/superset/issues/9190).
+
+## Requirements
+
+- Node.js 12+ (not tested with older versions)
+- Redis 5+
+
+To use this feature, Superset needs to be configured to enable global async 
queries and to use WebSockets as the transport (see below).
+
+## Install
+
+Install dependencies:
+```
+npm install
+```
+
+## WebSocket Server Configuration
+
+Copy `config.example.json` to `config.json` and adjust the values for your 
environment.
+
+## Superset configuration
+
+Configure the Superset Flask app to enable global async queries (in 
`superset_config.py`):
+
+Enable the `GLOBAL_ASYNC_QUERIES` feature flag:
+```
+"GLOBAL_ASYNC_QUERIES": True
+```
+
+Configure the following Superset values:
+```
+GLOBAL_ASYNC_QUERIES_TRANSPORT = "ws"
+GLOBAL_ASYNC_QUERIES_WEBSOCKET_URL = "ws://<host>:<port>/"
+```
+
+Note that the WebSocket server must be run on the same hostname (different 
port) for cookies to be shared between the Flask app and the WebSocket server.

Review comment:
       Is the JWT the only thing needed from the cookies? If so, have we 
considered passing that value explicitly? Seems like if we could do that, we 
could remove this assumption/constraint.

##########
File path: superset-websocket/src/index.ts
##########
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import * as http from 'http';
+import * as net from 'net';
+import WebSocket from 'ws';
+import { v4 as uuidv4 } from 'uuid';
+
+const jwt = require('jsonwebtoken');
+const cookie = require('cookie');
+const Redis = require('ioredis');
+
+export type StreamResult = [recordId: string, record: [label: 'data', data: 
string]];
+
+// sync with superset-frontend/src/components/ErrorMessage/types
+export type ErrorLevel = 'info' | 'warning' | 'error';
+export type SupersetError<ExtraType = Record<string, any> | null> = {
+  error_type: string;
+  extra: ExtraType;
+  level: ErrorLevel;
+  message: string;
+};
+
+type ListenerFunction = (results: StreamResult[]) => void;
+interface EventValue {
+  id: string,
+  channel_id: string,
+  job_id: string,
+  user_id?: string,
+  status: string,
+  errors?: SupersetError[],
+  result_url?: string,
+}
+interface JwtPayload { channel: string }
+interface FetchRangeFromStreamParams { sessionId: string, startId: string, 
endId: string, listener: ListenerFunction }
+export interface SocketInstance { ws: WebSocket, channel: string, pongTs: 
number }
+
+interface ChannelValue {
+  sockets: Array<string>,
+}
+
+const environment = process.env.NODE_ENV;
+export const opts = {
+  port: 8080,
+  redis: {
+    port: 6379,
+    host: "127.0.0.1",
+    password: "",
+    db: 0
+  },
+  streamPrefix: "async-events-",
+  jwtSecret: "",
+  jwtCookieName: "async-token",
+  redisStreamReadCount: 100,
+  redisStreamReadBlockMs: 5000,
+  socketResponseTimeoutMs: 60 * 1000,
+  pingSocketsIntervalMs: 20 * 1000,
+  gcChannelsIntervalMs: 120 * 1000,
+}
+
+const startServer = process.argv[2] === 'start';
+const configFile = environment === 'test' ? '../config.test.json' : 
'../config.json';
+let config = {};
+try {
+  config = require(configFile);
+} catch(err) {
+  console.warn('config.json not found, using defaults');
+}
+
+Object.assign(opts, config);
+
+if(startServer && opts.jwtSecret.length < 32)
+  throw('Please provide a JWT secret at least 32 bytes long')
+
+const redis = new Redis(opts.redis);
+const httpServer = http.createServer();
+export const wss = new WebSocket.Server({ noServer: true, clientTracking: 
false });
+
+const SOCKET_ACTIVE_STATES = [WebSocket.OPEN, WebSocket.CONNECTING];
+const GLOBAL_EVENT_STREAM_NAME = `${opts.streamPrefix}full`;
+const DEFAULT_STREAM_LAST_ID = '$';
+
+export let channels: Record<string, ChannelValue> = {};
+export let sockets: Record<string, SocketInstance> = {};
+let lastFirehoseId: string = DEFAULT_STREAM_LAST_ID;
+
+
+export const setLastFirehoseId = (id: string): void => {
+  lastFirehoseId = id;
+}
+
+export const trackClient = (channel: string, socketInstance: SocketInstance): 
string => {
+  const socketId = uuidv4();
+  sockets[socketId] = socketInstance;
+
+  if(channel in channels) {
+    channels[channel].sockets.push(socketId)
+  } else {
+    channels[channel] = {sockets: [socketId]};
+  }
+
+  return socketId;
+}
+
+export const sendToChannel = (channel: string, value: EventValue): void => {
+  const strData = JSON.stringify(value);
+  if(!channels[channel]) {

Review comment:
       What is the behavior when we pull an event off the redis stream but for 
one reason or another we have no channel to send it to? Is that event lost? Or 
does the event stay in the redis stream and we expect the client to have the 
right state to connect to the event server and pull it in the future?

##########
File path: superset-websocket/src/index.ts
##########
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import * as http from 'http';
+import * as net from 'net';
+import WebSocket from 'ws';
+import { v4 as uuidv4 } from 'uuid';
+
+const jwt = require('jsonwebtoken');
+const cookie = require('cookie');
+const Redis = require('ioredis');
+
+export type StreamResult = [recordId: string, record: [label: 'data', data: 
string]];
+
+// sync with superset-frontend/src/components/ErrorMessage/types
+export type ErrorLevel = 'info' | 'warning' | 'error';
+export type SupersetError<ExtraType = Record<string, any> | null> = {
+  error_type: string;
+  extra: ExtraType;
+  level: ErrorLevel;
+  message: string;
+};
+
+type ListenerFunction = (results: StreamResult[]) => void;
+interface EventValue {
+  id: string,
+  channel_id: string,
+  job_id: string,
+  user_id?: string,
+  status: string,
+  errors?: SupersetError[],
+  result_url?: string,
+}
+interface JwtPayload { channel: string }
+interface FetchRangeFromStreamParams { sessionId: string, startId: string, 
endId: string, listener: ListenerFunction }
+export interface SocketInstance { ws: WebSocket, channel: string, pongTs: 
number }
+
+interface ChannelValue {
+  sockets: Array<string>,
+}
+
+const environment = process.env.NODE_ENV;
+export const opts = {
+  port: 8080,
+  redis: {
+    port: 6379,
+    host: "127.0.0.1",
+    password: "",
+    db: 0
+  },
+  streamPrefix: "async-events-",
+  jwtSecret: "",
+  jwtCookieName: "async-token",
+  redisStreamReadCount: 100,
+  redisStreamReadBlockMs: 5000,
+  socketResponseTimeoutMs: 60 * 1000,
+  pingSocketsIntervalMs: 20 * 1000,
+  gcChannelsIntervalMs: 120 * 1000,
+}
+
+const startServer = process.argv[2] === 'start';
+const configFile = environment === 'test' ? '../config.test.json' : 
'../config.json';
+let config = {};
+try {
+  config = require(configFile);
+} catch(err) {
+  console.warn('config.json not found, using defaults');
+}
+
+Object.assign(opts, config);
+
+if(startServer && opts.jwtSecret.length < 32)
+  throw('Please provide a JWT secret at least 32 bytes long')
+
+const redis = new Redis(opts.redis);
+const httpServer = http.createServer();
+export const wss = new WebSocket.Server({ noServer: true, clientTracking: 
false });
+
+const SOCKET_ACTIVE_STATES = [WebSocket.OPEN, WebSocket.CONNECTING];
+const GLOBAL_EVENT_STREAM_NAME = `${opts.streamPrefix}full`;
+const DEFAULT_STREAM_LAST_ID = '$';
+
+export let channels: Record<string, ChannelValue> = {};
+export let sockets: Record<string, SocketInstance> = {};
+let lastFirehoseId: string = DEFAULT_STREAM_LAST_ID;
+
+
+export const setLastFirehoseId = (id: string): void => {
+  lastFirehoseId = id;
+}
+
+export const trackClient = (channel: string, socketInstance: SocketInstance): 
string => {
+  const socketId = uuidv4();
+  sockets[socketId] = socketInstance;
+
+  if(channel in channels) {
+    channels[channel].sockets.push(socketId)
+  } else {
+    channels[channel] = {sockets: [socketId]};
+  }
+
+  return socketId;
+}
+
+export const sendToChannel = (channel: string, value: EventValue): void => {
+  const strData = JSON.stringify(value);
+  if(!channels[channel]) {
+    console.debug(`channel ${channel} is unknown, skipping`);
+    return;
+  }
+  channels[channel].sockets.forEach(socketId => {
+    const socketInstance: SocketInstance = sockets[socketId];
+    if(!socketInstance) return cleanChannel(channel);
+    try {
+      socketInstance.ws.send(strData);
+    } catch(err) {
+      console.debug('Error sending to socket', err);
+      cleanChannel(channel);
+    }
+  });
+}
+
+export const fetchRangeFromStream = async ({sessionId, startId, endId, 
listener}: FetchRangeFromStreamParams) => {
+  const streamName = `${opts.streamPrefix}${sessionId}`;
+  try {
+    const reply = await redis.xrange(streamName, startId, endId);
+    if (!reply || !reply.length) return;
+    listener(reply);
+  } catch(e) {
+    console.error(e);
+  }
+}
+
+export const subscribeToGlobalStream = async (stream: string, listener: 
ListenerFunction) => {
+  /*eslint no-constant-condition: ["error", { "checkLoops": false }]*/
+  while (true) {

Review comment:
       It's probably premature to worry about performance here and I think this 
is likely not an issue, but I'm curious if there are any potential concerns you 
foresee with an infinite loop calling redis? It looks like the BLOCK option 
should prevent too many requests in too short a time since if no events are 
available it'll wait, but not sure what would happen if one or two events 
regularly trickle in within short timeframes?

##########
File path: superset-websocket/.gitignore
##########
@@ -0,0 +1,18 @@
+#

Review comment:
       Should this ignore `node_modules` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to