villebro commented on a change in pull request #13696:
URL: https://github.com/apache/superset/pull/13696#discussion_r598253146



##########
File path: superset-frontend/spec/javascripts/middleware/asyncEvent_spec.ts
##########
@@ -17,249 +17,203 @@
  * under the License.
  */
 import fetchMock from 'fetch-mock';
+import WS from 'jest-websocket-mock';
 import sinon from 'sinon';
 import * as featureFlags from 'src/featureFlags';
-import initAsyncEvents from 'src/middleware/asyncEvent';
-
-jest.useFakeTimers();
+import { parseErrorJson } from 'src/utils/getClientErrorObject';
+import * as asyncEvent from 'src/middleware/asyncEvent';
 
 describe('asyncEvent middleware', () => {
-  const next = sinon.spy();
-  const state = {
-    charts: {
-      123: {
-        id: 123,
-        status: 'loading',
-        asyncJobId: 'foo123',
-      },
-      345: {
-        id: 345,
-        status: 'loading',
-        asyncJobId: 'foo345',
-      },
-    },
+  const asyncPendingEvent = {
+    status: 'pending',
+    result_url: null,
+    job_id: 'foo123',
+    channel_id: '999',
+    errors: [],
+  };
+  const asyncDoneEvent = {
+    id: '1518951480106-0',
+    status: 'done',
+    result_url: '/api/v1/chart/data/cache-key-1',
+    job_id: 'foo123',
+    channel_id: '999',
+    errors: [],
   };
-  const events = [
-    {
-      status: 'done',
-      result_url: '/api/v1/chart/data/cache-key-1',
-      job_id: 'foo123',
-      channel_id: '999',
-      errors: [],
-    },
-    {
-      status: 'done',
-      result_url: '/api/v1/chart/data/cache-key-2',
-      job_id: 'foo345',
-      channel_id: '999',
-      errors: [],
-    },
-  ];
-  const mockStore = {
-    getState: () => state,
-    dispatch: sinon.stub(),
+  const asyncErrorEvent = {
+    id: '1518951480107-0',
+    status: 'error',
+    result_url: null,
+    job_id: 'foo123',
+    channel_id: '999',
+    errors: [{ message: "Error: relation 'foo' does not exist" }],
   };
-  const action = {
-    type: 'GENERIC_ACTION',
+  const chartData = {
+    some: 'data',

Review comment:
       nit: This comes down to personal preference, but for me it's always 
easier to digest tests/mocks if they resemble real requests/responses as much 
as possible. In this case I believe `chartData` should be of type 
`ChartDataResponseResult[]`: 
https://github.com/apache-superset/superset-ui/blob/a3ec94f7a72a3dbb6cb6563e7ea7d3582661ea43/packages/superset-ui-core/src/query/types/QueryResponse.ts#L46-L72

##########
File path: superset-frontend/src/middleware/asyncEvent.ts
##########
@@ -25,178 +24,240 @@ import {
   parseErrorJson,
 } from '../utils/getClientErrorObject';
 
-export type AsyncEvent = {
-  id: string;
+type AsyncEvent = {
+  id?: string | null;
   channel_id: string;
   job_id: string;
-  user_id: string;
+  user_id?: string;
   status: string;
-  errors: SupersetError[];
-  result_url: string;
-};
-
-type AsyncEventOptions = {
-  config: {
-    GLOBAL_ASYNC_QUERIES_TRANSPORT: string;
-    GLOBAL_ASYNC_QUERIES_POLLING_DELAY: number;
-  };
-  getPendingComponents: (state: any) => any[];
-  successAction: (componentId: number, componentData: any) => { type: string };
-  errorAction: (componentId: number, response: any) => { type: string };
-  processEventsCallback?: (events: AsyncEvent[]) => void; // this is currently 
used only for tests
+  errors?: SupersetError[];
+  result_url: string | null;
 };
 
 type CachedDataResponse = {
-  componentId: number;
   status: string;
   data: any;
 };
+type AppConfig = Record<string, any>;
+type ListenerFn = (asyncEvent: AsyncEvent) => Promise<any>;

Review comment:
       Bycatch: I believe the correct type for `data` should be 
`LegacyQueryData | ChartDataResponseResult[]`

##########
File path: superset-frontend/src/middleware/asyncEvent.ts
##########
@@ -25,178 +24,240 @@ import {
   parseErrorJson,
 } from '../utils/getClientErrorObject';
 
-export type AsyncEvent = {
-  id: string;
+type AsyncEvent = {
+  id?: string | null;
   channel_id: string;
   job_id: string;
-  user_id: string;
+  user_id?: string;
   status: string;
-  errors: SupersetError[];
-  result_url: string;
-};
-
-type AsyncEventOptions = {
-  config: {
-    GLOBAL_ASYNC_QUERIES_TRANSPORT: string;
-    GLOBAL_ASYNC_QUERIES_POLLING_DELAY: number;
-  };
-  getPendingComponents: (state: any) => any[];
-  successAction: (componentId: number, componentData: any) => { type: string };
-  errorAction: (componentId: number, response: any) => { type: string };
-  processEventsCallback?: (events: AsyncEvent[]) => void; // this is currently 
used only for tests
+  errors?: SupersetError[];
+  result_url: string | null;
 };
 
 type CachedDataResponse = {
-  componentId: number;
   status: string;
   data: any;
 };
+type AppConfig = Record<string, any>;
+type ListenerFn = (asyncEvent: AsyncEvent) => Promise<any>;
 
-const initAsyncEvents = (options: AsyncEventOptions) => {
-  // TODO: implement websocket support
-  const TRANSPORT_POLLING = 'polling';
-  const {
-    config,
-    getPendingComponents,
-    successAction,
-    errorAction,
-    processEventsCallback,
-  } = options;
-  const transport = config.GLOBAL_ASYNC_QUERIES_TRANSPORT || TRANSPORT_POLLING;
-  const polling_delay = config.GLOBAL_ASYNC_QUERIES_POLLING_DELAY || 500;
-
-  const middleware: Middleware = (store: MiddlewareAPI) => (next: Dispatch) => 
{
-    const JOB_STATUS = {
-      PENDING: 'pending',
-      RUNNING: 'running',
-      ERROR: 'error',
-      DONE: 'done',
-    };
-    const LOCALSTORAGE_KEY = 'last_async_event_id';
-    const POLLING_URL = '/api/v1/async_event/';
-    let lastReceivedEventId: string | null;
+const TRANSPORT_POLLING = 'polling';
+const TRANSPORT_WS = 'ws';
+const JOB_STATUS = {
+  PENDING: 'pending',
+  RUNNING: 'running',
+  ERROR: 'error',
+  DONE: 'done',
+};
+const LOCALSTORAGE_KEY = 'last_async_event_id';
+const POLLING_URL = '/api/v1/async_event/';
+const MAX_RETRIES = 6;
+const RETRY_DELAY = 100;
 
-    try {
-      lastReceivedEventId = localStorage.getItem(LOCALSTORAGE_KEY);
-    } catch (err) {
-      console.warn('Failed to fetch last event Id from localStorage');
+let config: AppConfig;
+let transport: string;
+let polling_delay: number;
+let listenersByJobId: Record<string, ListenerFn>;
+let retriesByJobId: Record<string, number>;
+let lastReceivedEventId: string | null | undefined;
+
+export const init = (appConfig?: AppConfig) => {
+  if (!isFeatureEnabled(FeatureFlag.GLOBAL_ASYNC_QUERIES)) return;
+
+  listenersByJobId = {};
+  retriesByJobId = {};
+  lastReceivedEventId = null;
+
+  if (appConfig) {
+    config = appConfig;
+  } else {
+    // load bootstrap data from DOM
+    const appContainer = document.getElementById('app');
+    if (appContainer) {
+      const bootstrapData = JSON.parse(
+        appContainer?.getAttribute('data-bootstrap') || '{}',
+      );
+      config = bootstrapData?.common?.conf;
+    } else {
+      config = {};
+      console.warn('asyncEvent: app config data not found');
     }
+  }
+  transport = config.GLOBAL_ASYNC_QUERIES_TRANSPORT || TRANSPORT_POLLING;
+  polling_delay = config.GLOBAL_ASYNC_QUERIES_POLLING_DELAY || 500;
 
-    const fetchEvents = makeApi<
-      { last_id?: string | null },
-      { result: AsyncEvent[] }
-    >({
-      method: 'GET',
-      endpoint: POLLING_URL,
-    });
+  try {
+    lastReceivedEventId = localStorage.getItem(LOCALSTORAGE_KEY);
+  } catch (err) {
+    console.warn('Failed to fetch last event Id from localStorage');
+  }
 
-    const fetchCachedData = async (
-      asyncEvent: AsyncEvent,
-      componentId: number,
-    ): Promise<CachedDataResponse> => {
-      let status = 'success';
-      let data;
-      try {
-        const { json } = await SupersetClient.get({
-          endpoint: asyncEvent.result_url,
-        });
-        data = 'result' in json ? json.result : json;
-      } catch (response) {
-        status = 'error';
-        data = await getClientErrorObject(response);
-      }
+  if (transport === TRANSPORT_POLLING) {
+    loadEventsFromApi();
+  }
+  if (transport === TRANSPORT_WS) {
+    wsConnect();
+  }
+};
 
-      return { componentId, status, data };
-    };
+const addListener = (id: string, fn: any) => {
+  listenersByJobId[id] = fn;
+};
 
-    const setLastId = (asyncEvent: AsyncEvent) => {
-      lastReceivedEventId = asyncEvent.id;
-      try {
-        localStorage.setItem(LOCALSTORAGE_KEY, lastReceivedEventId as string);
-      } catch (err) {
-        console.warn('Error saving event Id to localStorage', err);
-      }
-    };
+const removeListener = (id: string) => {
+  if (!listenersByJobId[id]) return;
+  delete listenersByJobId[id];
+};
 
-    const processEvents = async () => {
-      let queuedComponents = getPendingComponents(store.getState());
-      const eventArgs = lastReceivedEventId
-        ? { last_id: lastReceivedEventId }
-        : {};
-      const events: AsyncEvent[] = [];
-      if (queuedComponents && queuedComponents.length) {
-        try {
-          const { result: events } = await fetchEvents(eventArgs);
-          // refetch queuedComponents due to race condition where results are 
available
-          // before component state is updated with asyncJobId
-          queuedComponents = getPendingComponents(store.getState());
-          if (events && events.length) {
-            const componentsByJobId = queuedComponents.reduce((acc, item) => {
-              acc[item.asyncJobId] = item;
-              return acc;
-            }, {});
-            const fetchDataEvents: Promise<CachedDataResponse>[] = [];
-            events.forEach((asyncEvent: AsyncEvent) => {
-              const component = componentsByJobId[asyncEvent.job_id];
-              if (!component) {
-                console.warn(
-                  'Component not found for job_id',
-                  asyncEvent.job_id,
-                );
-                return setLastId(asyncEvent);
-              }
-              const componentId = component.id;
-              switch (asyncEvent.status) {
-                case JOB_STATUS.DONE:
-                  fetchDataEvents.push(
-                    fetchCachedData(asyncEvent, componentId),
-                  );
-                  break;
-                case JOB_STATUS.ERROR:
-                  store.dispatch(
-                    errorAction(componentId, [parseErrorJson(asyncEvent)]),
-                  );
-                  break;
-                default:
-                  console.warn('Received event with status', 
asyncEvent.status);
-              }
-
-              return setLastId(asyncEvent);
-            });
-
-            const fetchResults = await Promise.all(fetchDataEvents);
-            fetchResults.forEach(result => {
-              const data = Array.isArray(result.data)
-                ? result.data
-                : [result.data];
-              if (result.status === 'success') {
-                store.dispatch(successAction(result.componentId, data));
-              } else {
-                store.dispatch(errorAction(result.componentId, data));
-              }
-            });
+export const waitForAsyncData = async (asyncResponse: AsyncEvent) =>
+  new Promise((resolve, reject) => {
+    const jobId = asyncResponse.job_id;
+    const listener = async (asyncEvent: AsyncEvent) => {
+      switch (asyncEvent.status) {
+        case JOB_STATUS.DONE: {
+          let { data, status } = await fetchCachedData(asyncEvent); // 
eslint-disable-line prefer-const
+          data = Array.isArray(data) ? data : [data];

Review comment:
       `superset-ui/core` has a convenient util for this: `data = 
ensureIsArray(data)`: 
https://github.com/apache-superset/superset-ui/blob/a3ec94f7a72a3dbb6cb6563e7ea7d3582661ea43/packages/superset-ui-core/src/utils/ensureIsArray.ts#L24-L29




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to