Copilot commented on code in PR #5565:
URL: https://github.com/apache/texera/pull/5565#discussion_r3379090983
##########
frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts:
##########
@@ -161,6 +165,8 @@ export class ComputingUnitStatusService implements
OnDestroy {
if (this.workflowWebsocketService.isConnected) {
this.workflowWebsocketService.closeWebsocket();
this.workflowStatusService.clearStatus();
+ // switching units: signal consumers to clear their stale state
+ this.connectionResetSubject.next();
}
Review Comment:
The connection-reset signal (and status clear) is only triggered when
workflowWebsocketService.isConnected is true. If the app has stale
websocket-derived state but the socket is currently disconnected, switching
computing units would still reconnect but won’t emit the reset signal or clear
status, leaving stale UI state behind. Consider treating any previous
connection attempt as requiring teardown/reset when switching units, regardless
of current isConnected.
##########
frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts:
##########
@@ -95,6 +95,17 @@ export class WorkflowResultService {
return this.operatorResultServices.get(operatorID);
}
+ /**
+ * Drop cached operator results and reset table stats so a re-entered
workflow
+ * doesn't show stale results. resultTableStats is a ReplaySubject, so
+ * reset it to an empty snapshot to avoid replaying stale stats to
subscribers.
+ */
+ public clearResults(): void {
+ this.operatorResultServices.clear();
+ this.paginatedResultServices.clear();
+ this.resultTableStats.next({});
+ }
Review Comment:
clearResults() clears the internal caches but doesn’t notify existing UI
subscribers that they should re-render/clear (e.g., ResultPanel rerenders on
resultInitiateStream; without an emission, stale table/visualization content
can remain visible after a unit switch). Consider emitting a rerender signal
after clearing, and optionally emitting a cleared update record for operators
that previously had cached results.
##########
frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts:
##########
@@ -354,6 +354,15 @@ export class ExecuteWorkflowService {
};
}
+ /**
+ * Reset execution status and worker assignments. Unlike
resetExecutionState(),
+ * which resets only the status, this also clears the worker assignments.
+ */
+ public resetExecutionAndWorkers(): void {
+ this.resetExecutionState();
+ this.assignedWorkerIds.clear();
+ }
Review Comment:
resetExecutionAndWorkers() currently updates currentState without emitting
on executionStateStream. Components like MenuComponent (execution status) and
ResultPanelComponent rely on the stream to refresh, so unit-switch resets may
leave stale execution state visible. Consider using updateExecutionState(...)
here so subscribers are notified.
##########
frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.spec.ts:
##########
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { TestBed } from "@angular/core/testing";
+import { HttpClientTestingModule } from "@angular/common/http/testing";
+import { of } from "rxjs";
+import { ComputingUnitStatusService } from "./computing-unit-status.service";
+import { WorkflowComputingUnitManagingService } from
"../workflow-computing-unit/workflow-computing-unit-managing.service";
+import { WorkflowWebsocketService } from
"../../../../workspace/service/workflow-websocket/workflow-websocket.service";
+import { WorkflowStatusService } from
"../../../../workspace/service/workflow-status/workflow-status.service";
+import { UserService } from "../../user/user.service";
+import { StubUserService } from "../../user/stub-user.service";
+import { AuthService } from "../../user/auth.service";
+import { StubAuthService } from "../../user/stub-auth.service";
+import { DashboardWorkflowComputingUnit } from
"../../../type/workflow-computing-unit";
+import { commonTestProviders } from "../../../testing/test-utils";
+
+describe("ComputingUnitStatusService", () => {
+ let service: ComputingUnitStatusService;
+ let websocketService: WorkflowWebsocketService;
+
+ const mockUnit = (cuid: number) => ({ computingUnit: { cuid } }) as unknown
as DashboardWorkflowComputingUnit;
+
+ beforeEach(() => {
+ const managingStub = {
+ listComputingUnits: () => of([]),
+ getComputingUnit: (cuid: number) => of(mockUnit(cuid)),
+ terminateComputingUnit: () => of(undefined),
+ };
+
+ TestBed.configureTestingModule({
+ imports: [HttpClientTestingModule],
+ providers: [
+ ComputingUnitStatusService,
+ WorkflowWebsocketService,
+ WorkflowStatusService,
+ { provide: WorkflowComputingUnitManagingService, useValue:
managingStub },
+ { provide: UserService, useClass: StubUserService },
+ { provide: AuthService, useClass: StubAuthService },
+ ...commonTestProviders,
+ ],
+ });
+
+ service = TestBed.inject(ComputingUnitStatusService);
+ websocketService = TestBed.inject(WorkflowWebsocketService);
+ });
+
+ it("should be created", () => {
+ expect(service).toBeTruthy();
+ });
+
+ it("reconnects when re-selecting the same workflow after disconnect
(regression #3120)", () => {
+ const openSpy = vi.spyOn(websocketService,
"openWebsocket").mockImplementation(() => {});
+ const closeSpy = vi.spyOn(websocketService, "closeWebsocket");
+ (service as any).allComputingUnitsSubject.next([mockUnit(7)]);
+
+ // Enter workflow 5 on computing unit 7 → opens the websocket once.
+ service.selectComputingUnit(5, 7);
+ expect(openSpy).toHaveBeenCalledTimes(1);
+
+ // User returns to the dashboard.
+ service.disconnect();
+ expect(closeSpy).toHaveBeenCalled();
+
+ // Re-enter the SAME workflow (the `wid -> null -> wid` pattern): without
the
+ // cleanup, the retained currentConnectedWid/Cuid would suppress the
reconnect.
+ service.selectComputingUnit(5, 7);
+ expect(openSpy).toHaveBeenCalledTimes(2);
+ });
+
+ it("disconnect() clears the selected computing unit", () => {
+ vi.spyOn(websocketService, "openWebsocket").mockImplementation(() => {});
+ (service as any).allComputingUnitsSubject.next([mockUnit(7)]);
+ service.selectComputingUnit(5, 7);
+
+ let latest: DashboardWorkflowComputingUnit | null = mockUnit(7);
+ service.getSelectedComputingUnit().subscribe(unit => (latest = unit));
+ expect(latest).not.toBeNull();
+
+ service.disconnect();
+ expect(latest).toBeNull();
+ });
+
+ it("emits a connection-reset signal when switching to a different computing
unit (issue #3120)", () => {
+ let connected = false;
+ vi.spyOn(websocketService, "openWebsocket").mockImplementation(() => {
+ connected = true;
+ });
+ vi.spyOn(websocketService, "closeWebsocket").mockImplementation(() => {
+ connected = false;
+ });
+ vi.spyOn(websocketService, "isConnected", "get").mockImplementation(() =>
connected);
+ (service as any).allComputingUnitsSubject.next([mockUnit(7), mockUnit(8)]);
+
+ let resetCount = 0;
+ service.getConnectionResetStream().subscribe(() => resetCount++);
+
+ // First connection on unit 7: nothing to tear down yet → no signal.
+ service.selectComputingUnit(5, 7);
+ expect(resetCount).toBe(0);
+
+ // Switch to a different unit while connected → tear-down signal fires
once.
+ service.selectComputingUnit(5, 8);
+ expect(resetCount).toBe(1);
+ });
Review Comment:
This test starts polling via selectComputingUnit() (RxJS interval) and never
tears it down, which can leave a live timer running after the test finishes and
cause the suite to hang/flap. Add cleanup (disconnect() or ngOnDestroy()) at
the end of the test.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]