This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 27e6c3a543c show processlist performance improve (#19857)
27e6c3a543c is described below
commit 27e6c3a543c0d5dcd83e6683d80a6295c44e3b2b
Author: Chuxin Chen <[email protected]>
AuthorDate: Thu Aug 4 16:00:02 2022 +0800
show processlist performance improve (#19857)
* show processlist performance
* show processlist performance
---
.../model/yaml/YamlExecuteProcessContext.java | 8 +-
.../process/GovernanceExecuteProcessReporter.java | 27 +++--
.../process/event/ExecuteProcessReportEvent.java | 31 -----
.../event/ExecuteProcessSummaryReportEvent.java | 32 ------
.../event/ExecuteProcessUnitReportEvent.java | 34 ------
...executor.sql.process.spi.ExecuteProcessReporter | 2 +-
.../process/EventBusContextHolderFixture.java | 2 +-
.../GovernanceExecuteProcessReporterTest.java | 104 +++++++++++++++++
.../cluster/coordinator/RegistryCenter.java | 2 +-
.../subscriber/ProcessRegistrySubscriber.java | 84 +-------------
.../subscriber/ProcessRegistrySubscriberTest.java | 79 +------------
.../ProcessRegistrySubscriberTestNoMock.java | 125 ---------------------
...nceExecuteProcessReporterSubscriberFixture.java | 64 -----------
.../GovernanceExecuteProcessReporterTest.java | 53 ---------
.../StandaloneContextManagerBuilder.java | 4 +-
.../subscriber/ProcessStandaloneSubscriber.java | 57 ++++++++++
.../process/ProcessStandaloneSubscriberTest.java | 67 +++++++++++
17 files changed, 261 insertions(+), 514 deletions(-)
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/yaml/YamlExecuteProcessContext.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/yaml/YamlExecuteProcessContext.java
index f135ff7227b..45710bb6fc1 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/yaml/YamlExecuteProcessContext.java
+++
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/yaml/YamlExecuteProcessContext.java
@@ -17,13 +17,14 @@
package org.apache.shardingsphere.infra.executor.sql.process.model.yaml;
+import java.util.ArrayList;
import java.util.Collection;
-import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessContext;
+import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessUnit;
/**
* Execute process context for YAML.
@@ -54,7 +55,10 @@ public final class YamlExecuteProcessContext {
username = executeProcessContext.getUsername();
hostname = executeProcessContext.getHostname();
sql = executeProcessContext.getSql();
- unitStatuses =
executeProcessContext.getUnitStatuses().stream().map(YamlExecuteProcessUnit::new).collect(Collectors.toList());
+ unitStatuses = new
ArrayList<>(executeProcessContext.getUnitStatuses().size());
+ for (ExecuteProcessUnit each :
executeProcessContext.getUnitStatuses()) {
+ unitStatuses.add(new YamlExecuteProcessUnit(each));
+ }
startTimeMillis = executeProcessContext.getStartTimeMillis();
}
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/process/GovernanceExecuteProcessReporter.java
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/process/GovernanceExecuteProcessReporter.java
similarity index 68%
rename from
shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/process/GovernanceExecuteProcessReporter.java
rename to
shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/process/GovernanceExecuteProcessReporter.java
index a65d6e35fd0..58a7d294189 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/process/GovernanceExecuteProcessReporter.java
+++
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/process/GovernanceExecuteProcessReporter.java
@@ -15,20 +15,18 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.manager.cluster.process;
+package org.apache.shardingsphere.mode.process;
import org.apache.shardingsphere.infra.binder.LogicSQL;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessConstants;
import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessContext;
import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessUnit;
+import
org.apache.shardingsphere.infra.executor.sql.process.model.yaml.YamlExecuteProcessContext;
+import
org.apache.shardingsphere.infra.executor.sql.process.model.yaml.YamlExecuteProcessUnit;
import
org.apache.shardingsphere.infra.executor.sql.process.spi.ExecuteProcessReporter;
-import org.apache.shardingsphere.mode.process.ShowProcessListManager;
-import org.apache.shardingsphere.mode.process.event.ExecuteProcessReportEvent;
-import
org.apache.shardingsphere.mode.process.event.ExecuteProcessSummaryReportEvent;
-import
org.apache.shardingsphere.mode.process.event.ExecuteProcessUnitReportEvent;
+import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
/**
* Governance execute process reporter.
@@ -39,18 +37,29 @@ public final class GovernanceExecuteProcessReporter
implements ExecuteProcessRep
public void report(final LogicSQL logicSQL, final ExecutionGroupContext<?
extends SQLExecutionUnit> executionGroupContext,
final ExecuteProcessConstants constants, final
EventBusContext eventBusContext) {
ExecuteProcessContext executeProcessContext = new
ExecuteProcessContext(logicSQL.getSql(), executionGroupContext, constants);
- eventBusContext.post(new
ExecuteProcessSummaryReportEvent(executeProcessContext));
+
ShowProcessListManager.getInstance().putProcessContext(executeProcessContext.getExecutionID(),
new YamlExecuteProcessContext(executeProcessContext));
}
@Override
public void report(final String executionID, final SQLExecutionUnit
executionUnit, final ExecuteProcessConstants constants, final EventBusContext
eventBusContext) {
ExecuteProcessUnit executeProcessUnit = new
ExecuteProcessUnit(executionUnit.getExecutionUnit(), constants);
- eventBusContext.post(new ExecuteProcessUnitReportEvent(executionID,
executeProcessUnit));
+ YamlExecuteProcessContext yamlExecuteProcessContext =
ShowProcessListManager.getInstance().getProcessContext(executionID);
+ for (YamlExecuteProcessUnit each :
yamlExecuteProcessContext.getUnitStatuses()) {
+ if (each.getUnitID().equals(executeProcessUnit.getUnitID())) {
+ each.setStatus(executeProcessUnit.getStatus());
+ }
+ }
}
@Override
public void report(final String executionID, final ExecuteProcessConstants
constants, final EventBusContext eventBusContext) {
- eventBusContext.post(new ExecuteProcessReportEvent(executionID));
+ YamlExecuteProcessContext yamlExecuteProcessContext =
ShowProcessListManager.getInstance().getProcessContext(executionID);
+ for (YamlExecuteProcessUnit each :
yamlExecuteProcessContext.getUnitStatuses()) {
+ if (each.getStatus() !=
ExecuteProcessConstants.EXECUTE_STATUS_DONE) {
+ return;
+ }
+ }
+ ShowProcessListManager.getInstance().removeProcessContext(executionID);
}
@Override
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/process/event/ExecuteProcessReportEvent.java
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/process/event/ExecuteProcessReportEvent.java
deleted file mode 100644
index 2137f2162b8..00000000000
---
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/process/event/ExecuteProcessReportEvent.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.process.event;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-
-/**
- * Execute process report event.
- */
-@RequiredArgsConstructor
-@Getter
-public final class ExecuteProcessReportEvent {
-
- private final String executionID;
-}
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/process/event/ExecuteProcessSummaryReportEvent.java
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/process/event/ExecuteProcessSummaryReportEvent.java
deleted file mode 100644
index 0a3a74bff58..00000000000
---
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/process/event/ExecuteProcessSummaryReportEvent.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.process.event;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessContext;
-
-/**
- * Execute process summary report event.
- */
-@RequiredArgsConstructor
-@Getter
-public final class ExecuteProcessSummaryReportEvent {
-
- private final ExecuteProcessContext executeProcessContext;
-}
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/process/event/ExecuteProcessUnitReportEvent.java
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/process/event/ExecuteProcessUnitReportEvent.java
deleted file mode 100644
index 7e631e8e9f0..00000000000
---
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/process/event/ExecuteProcessUnitReportEvent.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.process.event;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessUnit;
-
-/**
- * Execute process unit report event.
- */
-@RequiredArgsConstructor
-@Getter
-public final class ExecuteProcessUnitReportEvent {
-
- private final String executionID;
-
- private final ExecuteProcessUnit executeProcessUnit;
-}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.executor.sql.process.spi.ExecuteProcessReporter
b/shardingsphere-mode/shardingsphere-mode-core/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.executor.sql.process.spi.ExecuteProcessReporter
similarity index 89%
rename from
shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.executor.sql.process.spi.ExecuteProcessReporter
rename to
shardingsphere-mode/shardingsphere-mode-core/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.executor.sql.process.spi.ExecuteProcessReporter
index c1bc74798a0..c9761f91b35 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.executor.sql.process.spi.ExecuteProcessReporter
+++
b/shardingsphere-mode/shardingsphere-mode-core/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.executor.sql.process.spi.ExecuteProcessReporter
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.mode.manager.cluster.process.GovernanceExecuteProcessReporter
+org.apache.shardingsphere.mode.process.GovernanceExecuteProcessReporter
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/process/EventBusContextHolderFixture.java
b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/process/EventBusContextHolderFixture.java
similarity index 94%
rename from
shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/process/EventBusContextHolderFixture.java
rename to
shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/process/EventBusContextHolderFixture.java
index 9ac9467ecef..c745d1df6dc 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/process/EventBusContextHolderFixture.java
+++
b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/process/EventBusContextHolderFixture.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.manager.cluster.process;
+package org.apache.shardingsphere.mode.process;
import lombok.Getter;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/process/GovernanceExecuteProcessReporterTest.java
b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/process/GovernanceExecuteProcessReporterTest.java
new file mode 100644
index 00000000000..a764052ed8e
--- /dev/null
+++
b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/process/GovernanceExecuteProcessReporterTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.process;
+
+import org.apache.shardingsphere.infra.binder.LogicSQL;
+import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
+import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessConstants;
+import
org.apache.shardingsphere.infra.executor.sql.process.model.yaml.YamlExecuteProcessContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.MockedStatic;
+
+import java.util.Collections;
+import java.util.UUID;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public final class GovernanceExecuteProcessReporterTest {
+
+ private MockedStatic<ShowProcessListManager> mockedStatic;
+
+ private ShowProcessListManager showProcessListManager;
+
+ private final GovernanceExecuteProcessReporter reporter = new
GovernanceExecuteProcessReporter();
+
+ @Before
+ public void setUp() {
+ mockedStatic = mockStatic(ShowProcessListManager.class);
+ showProcessListManager = mock(ShowProcessListManager.class);
+
mockedStatic.when(ShowProcessListManager::getInstance).thenReturn(showProcessListManager);
+ }
+
+ @Test
+ public void assertReport() {
+ LogicSQL logicSQL = new LogicSQL(null, null, null);
+ ExecutionGroupContext<? extends SQLExecutionUnit>
executionGroupContext = mockExecutionGroupContext();
+ reporter.report(logicSQL, executionGroupContext,
ExecuteProcessConstants.EXECUTE_ID,
EventBusContextHolderFixture.EVENT_BUS_CONTEXT);
+ verify(showProcessListManager,
times(1)).putProcessContext(eq(executionGroupContext.getExecutionID()), any());
+ }
+
+ @SuppressWarnings("unchecked")
+ private ExecutionGroupContext<? extends SQLExecutionUnit>
mockExecutionGroupContext() {
+ ExecutionGroupContext<? extends SQLExecutionUnit> result =
mock(ExecutionGroupContext.class);
+ when(result.getExecutionID()).thenReturn(UUID.randomUUID().toString());
+ return result;
+ }
+
+ @Test
+ public void assertReportUnit() {
+ SQLExecutionUnit sqlExecutionUnit = mock(SQLExecutionUnit.class);
+ ExecutionUnit executionUnit = mock(ExecutionUnit.class);
+ when(sqlExecutionUnit.getExecutionUnit()).thenReturn(executionUnit);
+ YamlExecuteProcessContext yamlExecuteProcessContext =
mock(YamlExecuteProcessContext.class);
+
when(yamlExecuteProcessContext.getUnitStatuses()).thenReturn(Collections.emptyList());
+
when(showProcessListManager.getProcessContext("foo_id")).thenReturn(yamlExecuteProcessContext);
+ reporter.report("foo_id", sqlExecutionUnit,
ExecuteProcessConstants.EXECUTE_ID,
EventBusContextHolderFixture.EVENT_BUS_CONTEXT);
+ verify(showProcessListManager,
times(1)).getProcessContext(eq("foo_id"));
+ }
+
+ @Test
+ public void assertReportComplete() {
+ YamlExecuteProcessContext yamlExecuteProcessContext =
mock(YamlExecuteProcessContext.class);
+
when(yamlExecuteProcessContext.getUnitStatuses()).thenReturn(Collections.emptyList());
+
when(showProcessListManager.getProcessContext("foo_id")).thenReturn(yamlExecuteProcessContext);
+ reporter.report("foo_id", ExecuteProcessConstants.EXECUTE_STATUS_DONE,
EventBusContextHolderFixture.EVENT_BUS_CONTEXT);
+ verify(showProcessListManager,
times(1)).getProcessContext(eq("foo_id"));
+ verify(showProcessListManager,
times(1)).removeProcessContext(eq("foo_id"));
+ }
+
+ @Test
+ public void assertReportClean() {
+ reporter.reportClean("foo_id");
+ verify(showProcessListManager,
times(1)).removeProcessContext(eq("foo_id"));
+ }
+
+ @After
+ public void tearDown() {
+ mockedStatic.close();
+ }
+}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
index 781d3dd6216..ae2c07144e0 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
@@ -28,7 +28,7 @@ import
org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.M
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcherFactory;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.cache.subscriber.ScalingRegistrySubscriber;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.subscriber.SchemaMetaDataRegistrySubscriber;
-import
org.apache.shardingsphere.mode.process.subscriber.ProcessRegistrySubscriber;
+import
org.apache.shardingsphere.mode.manager.cluster.process.subscriber.ProcessRegistrySubscriber;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service.ComputeNodeStatusService;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.subscriber.ComputeNodeStatusSubscriber;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.service.StorageNodeStatusService;
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/process/subscriber/ProcessRegistrySubscriber.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/process/subscriber/ProcessRegistrySubscriber.java
similarity index 55%
rename from
shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/process/subscriber/ProcessRegistrySubscriber.java
rename to
shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/process/subscriber/ProcessRegistrySubscriber.java
index b2df1a96631..53577189b3c 100644
---
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/process/subscriber/ProcessRegistrySubscriber.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/process/subscriber/ProcessRegistrySubscriber.java
@@ -15,33 +15,20 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.process.subscriber;
+package org.apache.shardingsphere.mode.manager.cluster.process.subscriber;
-import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
-import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessConstants;
-import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessContext;
-import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessUnit;
-import
org.apache.shardingsphere.infra.executor.sql.process.model.yaml.BatchYamlExecuteProcessContext;
-import
org.apache.shardingsphere.infra.executor.sql.process.model.yaml.YamlExecuteProcessContext;
-import
org.apache.shardingsphere.infra.executor.sql.process.model.yaml.YamlExecuteProcessUnit;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.mode.persist.PersistRepository;
import org.apache.shardingsphere.mode.process.ShowProcessListManager;
-import org.apache.shardingsphere.mode.process.event.ExecuteProcessReportEvent;
-import
org.apache.shardingsphere.mode.process.event.ExecuteProcessSummaryReportEvent;
-import
org.apache.shardingsphere.mode.process.event.ExecuteProcessUnitReportEvent;
import
org.apache.shardingsphere.mode.process.event.ShowProcessListRequestEvent;
import
org.apache.shardingsphere.mode.process.event.ShowProcessListResponseEvent;
import org.apache.shardingsphere.mode.process.lock.ShowProcessListSimpleLock;
import org.apache.shardingsphere.mode.process.node.ProcessNode;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
@@ -65,12 +52,6 @@ public final class ProcessRegistrySubscriber {
eventBusContext.register(this);
}
- public ProcessRegistrySubscriber(final EventBusContext eventBusContext) {
- this.eventBusContext = eventBusContext;
- repository = null;
- eventBusContext.register(this);
- }
-
/**
* Load show process list data.
*
@@ -78,22 +59,6 @@ public final class ProcessRegistrySubscriber {
*/
@Subscribe
public void loadShowProcessListData(final ShowProcessListRequestEvent
event) {
- if (null != repository) {
- loadClusterShowProcessListData();
- } else {
- loadStandaloneShowProcessListData();
- }
- }
-
- private void loadStandaloneShowProcessListData() {
- BatchYamlExecuteProcessContext batchYamlExecuteProcessContext = new
BatchYamlExecuteProcessContext(new ArrayList<>(
-
ShowProcessListManager.getInstance().getProcessContextMap().values()));
- eventBusContext.post(new
ShowProcessListResponseEvent(batchYamlExecuteProcessContext.getContexts().isEmpty()
- ? Collections.emptyList()
- :
Collections.singletonList(YamlEngine.marshal(batchYamlExecuteProcessContext))));
- }
-
- private void loadClusterShowProcessListData() {
String showProcessListId = new
UUID(ThreadLocalRandom.current().nextLong(),
ThreadLocalRandom.current().nextLong()).toString();
boolean triggerIsComplete = false;
Collection<String> triggerPaths = getTriggerPaths(showProcessListId);
@@ -145,51 +110,4 @@ public final class ProcessRegistrySubscriber {
}
eventBusContext.post(new
ShowProcessListResponseEvent(batchProcessContexts));
}
-
- /**
- * Report execute process summary.
- *
- * @param event execute process summary report event.
- */
- @Subscribe
- @AllowConcurrentEvents
- public void reportExecuteProcessSummary(final
ExecuteProcessSummaryReportEvent event) {
- ExecuteProcessContext executeProcessContext =
event.getExecuteProcessContext();
-
ShowProcessListManager.getInstance().putProcessContext(executeProcessContext.getExecutionID(),
new YamlExecuteProcessContext(executeProcessContext));
- }
-
- /**
- * Report execute process unit.
- *
- * @param event execute process unit report event.
- */
- @Subscribe
- @AllowConcurrentEvents
- public void reportExecuteProcessUnit(final ExecuteProcessUnitReportEvent
event) {
- String executionID = event.getExecutionID();
- YamlExecuteProcessContext yamlExecuteProcessContext =
ShowProcessListManager.getInstance().getProcessContext(executionID);
- ExecuteProcessUnit executeProcessUnit = event.getExecuteProcessUnit();
- for (YamlExecuteProcessUnit each :
yamlExecuteProcessContext.getUnitStatuses()) {
- if (each.getUnitID().equals(executeProcessUnit.getUnitID())) {
- each.setStatus(executeProcessUnit.getStatus());
- }
- }
- }
-
- /**
- * Report execute process.
- *
- * @param event execute process report event.
- */
- @Subscribe
- @AllowConcurrentEvents
- public void reportExecuteProcess(final ExecuteProcessReportEvent event) {
- YamlExecuteProcessContext yamlExecuteProcessContext =
ShowProcessListManager.getInstance().getProcessContext(event.getExecutionID());
- for (YamlExecuteProcessUnit each :
yamlExecuteProcessContext.getUnitStatuses()) {
- if (each.getStatus() !=
ExecuteProcessConstants.EXECUTE_STATUS_DONE) {
- return;
- }
- }
-
ShowProcessListManager.getInstance().removeProcessContext(event.getExecutionID());
- }
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessRegistrySubscriberTest.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessRegistrySubscriberTest.java
index db0be2332f1..8e27619dc99 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessRegistrySubscriberTest.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessRegistrySubscriberTest.java
@@ -17,36 +17,22 @@
package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
-import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessConstants;
-import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessContext;
-import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessUnit;
-import
org.apache.shardingsphere.infra.executor.sql.process.model.yaml.YamlExecuteProcessContext;
-import
org.apache.shardingsphere.infra.executor.sql.process.model.yaml.YamlExecuteProcessUnit;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
-import org.apache.shardingsphere.mode.process.ShowProcessListManager;
-import org.apache.shardingsphere.mode.process.event.ExecuteProcessReportEvent;
-import
org.apache.shardingsphere.mode.process.event.ExecuteProcessSummaryReportEvent;
-import
org.apache.shardingsphere.mode.process.event.ExecuteProcessUnitReportEvent;
-import
org.apache.shardingsphere.mode.process.event.ShowProcessListRequestEvent;
+import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
+import
org.apache.shardingsphere.mode.manager.cluster.process.subscriber.ProcessRegistrySubscriber;
import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
-import
org.apache.shardingsphere.mode.process.subscriber.ProcessRegistrySubscriber;
+import
org.apache.shardingsphere.mode.process.event.ShowProcessListRequestEvent;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
-import org.mockito.MockedStatic;
import org.mockito.junit.MockitoJUnitRunner;
-import java.util.Collection;
import java.util.Collections;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -61,9 +47,6 @@ public final class ProcessRegistrySubscriberTest {
private ProcessRegistrySubscriber processRegistrySubscriber;
- @Mock
- private ShowProcessListManager showProcessListManager;
-
@Before
public void setUp() {
processRegistrySubscriber = new ProcessRegistrySubscriber(repository,
eventBusContext);
@@ -78,60 +61,4 @@ public final class ProcessRegistrySubscriberTest {
processRegistrySubscriber.loadShowProcessListData(showProcessListRequestEvent);
verify(repository, times(1)).persist(any(), any());
}
-
- @Test
- public void assertReportExecuteProcessSummary() {
- try (MockedStatic<ShowProcessListManager> mockedStatic =
mockStatic(ShowProcessListManager.class)) {
-
mockedStatic.when(ShowProcessListManager::getInstance).thenReturn(showProcessListManager);
- ExecuteProcessContext executeProcessContext =
mock(ExecuteProcessContext.class);
- ExecuteProcessSummaryReportEvent event =
mock(ExecuteProcessSummaryReportEvent.class);
-
when(event.getExecuteProcessContext()).thenReturn(executeProcessContext);
- when(executeProcessContext.getExecutionID()).thenReturn("id");
- processRegistrySubscriber.reportExecuteProcessSummary(event);
- verify(showProcessListManager, times(1)).putProcessContext(any(),
any());
- }
- }
-
- @Test
- public void assertReportExecuteProcessUnit() {
- try (MockedStatic<ShowProcessListManager> mockedStatic =
mockStatic(ShowProcessListManager.class)) {
-
mockedStatic.when(ShowProcessListManager::getInstance).thenReturn(showProcessListManager);
- ExecuteProcessUnitReportEvent event =
mock(ExecuteProcessUnitReportEvent.class);
- when(event.getExecutionID()).thenReturn("id");
- YamlExecuteProcessContext context =
mockYamlExecuteProcessContext();
-
when(showProcessListManager.getProcessContext(event.getExecutionID())).thenReturn(context);
- ExecuteProcessUnit unit = mockExecuteProcessUnit();
- when(event.getExecuteProcessUnit()).thenReturn(unit);
- processRegistrySubscriber.reportExecuteProcessUnit(event);
-
assertThat(context.getUnitStatuses().iterator().next().getStatus(),
is(ExecuteProcessConstants.EXECUTE_STATUS_DONE));
- }
- }
-
- @Test
- public void assertReportExecuteProcess() {
- try (MockedStatic<ShowProcessListManager> mockedStatic =
mockStatic(ShowProcessListManager.class)) {
-
mockedStatic.when(ShowProcessListManager::getInstance).thenReturn(showProcessListManager);
- ExecuteProcessReportEvent event =
mock(ExecuteProcessReportEvent.class);
-
when(showProcessListManager.getProcessContext(any())).thenReturn(mock(YamlExecuteProcessContext.class));
- processRegistrySubscriber.reportExecuteProcess(event);
- verify(showProcessListManager,
times(1)).removeProcessContext(any());
- }
- }
-
- private YamlExecuteProcessContext mockYamlExecuteProcessContext() {
- YamlExecuteProcessUnit yamlExecuteProcessUnit = new
YamlExecuteProcessUnit();
- yamlExecuteProcessUnit.setUnitID("159917166");
-
yamlExecuteProcessUnit.setStatus(ExecuteProcessConstants.EXECUTE_STATUS_START);
- Collection<YamlExecuteProcessUnit> unitStatuses =
Collections.singletonList(yamlExecuteProcessUnit);
- YamlExecuteProcessContext result = new YamlExecuteProcessContext();
- result.setUnitStatuses(unitStatuses);
- return result;
- }
-
- private ExecuteProcessUnit mockExecuteProcessUnit() {
- ExecuteProcessUnit result = mock(ExecuteProcessUnit.class);
- when(result.getUnitID()).thenReturn("159917166");
-
when(result.getStatus()).thenReturn(ExecuteProcessConstants.EXECUTE_STATUS_DONE);
- return result;
- }
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessRegistrySubscriberTestNoMock.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessRegistrySubscrib
[...]
deleted file mode 100644
index 236d554fd9e..00000000000
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessRegistrySubscriberTestNoMock.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber;
-
-import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
-import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
-import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
-import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
-import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessConstants;
-import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessContext;
-import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessUnit;
-import
org.apache.shardingsphere.infra.executor.sql.process.model.yaml.YamlExecuteProcessContext;
-import
org.apache.shardingsphere.infra.executor.sql.process.model.yaml.YamlExecuteProcessUnit;
-import org.apache.shardingsphere.infra.metadata.user.Grantee;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.ProcessListClusterPersistRepositoryFixture;
-import org.apache.shardingsphere.mode.process.ShowProcessListManager;
-import org.apache.shardingsphere.mode.process.event.ExecuteProcessReportEvent;
-import
org.apache.shardingsphere.mode.process.event.ExecuteProcessSummaryReportEvent;
-import
org.apache.shardingsphere.mode.process.event.ExecuteProcessUnitReportEvent;
-import
org.apache.shardingsphere.mode.process.subscriber.ProcessRegistrySubscriber;
-import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
-import org.junit.Test;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
-
-public final class ProcessRegistrySubscriberTestNoMock {
-
- private final ClusterPersistRepository repository = new
ProcessListClusterPersistRepositoryFixture();
-
- private final ProcessRegistrySubscriber subscriber = new
ProcessRegistrySubscriber(repository, new EventBusContext());
-
- private ExecuteProcessContext createExecuteProcessContext() {
- ExecutionUnit executionUnit = createExecuteUnit();
- Collection<ExecutionGroup<JDBCExecutionUnit>> inputGroups = new
LinkedList<>();
- inputGroups.add(new ExecutionGroup<>(Collections.singletonList(new
JDBCExecutionUnit(executionUnit, ConnectionMode.MEMORY_STRICTLY, null))));
- ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = new
ExecutionGroupContext<>(inputGroups);
- executionGroupContext.setDatabaseName("sharding_db");
- executionGroupContext.setGrantee(new Grantee("sharding", "127.0.0.1"));
- return new ExecuteProcessContext("sql1", executionGroupContext,
ExecuteProcessConstants.EXECUTE_STATUS_START);
- }
-
- private ExecutionUnit createExecuteUnit() {
- return new ExecutionUnit("ds_0", new SQLUnit("sql1_0",
Collections.emptyList()));
- }
-
- @Test
- public void assertWholeProcessCompleted() {
- ExecuteProcessContext executeProcessContext =
createExecuteProcessContext();
- assertReportExecuteProcessSummary(executeProcessContext);
- ExecuteProcessConstants processConstants =
ExecuteProcessConstants.EXECUTE_STATUS_DONE;
- assertReportExecuteProcessUnit(processConstants,
executeProcessContext.getExecutionID());
- assertReportExecuteProcess(processConstants,
executeProcessContext.getExecutionID());
- }
-
- @Test
- public void assertWholeProcessUncompleted() {
- ExecuteProcessContext executeProcessContext =
createExecuteProcessContext();
- assertReportExecuteProcessSummary(executeProcessContext);
- ExecuteProcessConstants processConstants =
ExecuteProcessConstants.EXECUTE_STATUS_START;
- assertReportExecuteProcessUnit(processConstants,
executeProcessContext.getExecutionID());
- assertReportExecuteProcess(processConstants,
executeProcessContext.getExecutionID());
- }
-
- private void assertReportExecuteProcessSummary(final ExecuteProcessContext
executeProcessContext) {
- subscriber.reportExecuteProcessSummary(new
ExecuteProcessSummaryReportEvent(executeProcessContext));
- String executionID = executeProcessContext.getExecutionID();
- YamlExecuteProcessContext yamlExecuteProcessContext =
ShowProcessListManager.getInstance().getProcessContext(executionID);
- assertThat(yamlExecuteProcessContext.getExecutionID(),
is(executionID));
- assertThat(yamlExecuteProcessContext.getStartTimeMillis(),
is(executeProcessContext.getStartTimeMillis()));
- assertThat(yamlExecuteProcessContext.getDatabaseName(),
is("sharding_db"));
- assertThat(yamlExecuteProcessContext.getUsername(), is("sharding"));
- assertThat(yamlExecuteProcessContext.getHostname(), is("127.0.0.1"));
- assertThat(yamlExecuteProcessContext.getSql(), is("sql1"));
- Collection<YamlExecuteProcessUnit> unitStatuses =
yamlExecuteProcessContext.getUnitStatuses();
- assertThat(unitStatuses.size(), is(1));
- YamlExecuteProcessUnit yamlExecuteProcessUnit =
unitStatuses.iterator().next();
- assertThat(yamlExecuteProcessUnit.getStatus(),
is(ExecuteProcessConstants.EXECUTE_STATUS_START));
- }
-
- private void assertReportExecuteProcessUnit(final ExecuteProcessConstants
processConstants, final String executionID) {
- ExecuteProcessUnitReportEvent event = new
ExecuteProcessUnitReportEvent(executionID, new
ExecuteProcessUnit(createExecuteUnit(), processConstants));
- subscriber.reportExecuteProcessUnit(event);
- YamlExecuteProcessContext yamlExecuteProcessContext =
ShowProcessListManager.getInstance().getProcessContext(executionID);
- assertThat(yamlExecuteProcessContext.getExecutionID(),
is(executionID));
- YamlExecuteProcessUnit yamlExecuteProcessUnit =
yamlExecuteProcessContext.getUnitStatuses().iterator().next();
- assertThat(yamlExecuteProcessUnit.getStatus(), is(processConstants));
- }
-
- private void assertReportExecuteProcess(final ExecuteProcessConstants
processConstants, final String executionID) {
- ExecuteProcessReportEvent event = new
ExecuteProcessReportEvent(executionID);
- subscriber.reportExecuteProcess(event);
- YamlExecuteProcessContext yamlExecuteProcessContext =
ShowProcessListManager.getInstance().getProcessContext(executionID);
- if (ExecuteProcessConstants.EXECUTE_STATUS_DONE == processConstants) {
- assertNull(yamlExecuteProcessContext);
- } else {
- assertThat(yamlExecuteProcessContext.getExecutionID(),
is(executionID));
- YamlExecuteProcessUnit yamlExecuteProcessUnit =
yamlExecuteProcessContext.getUnitStatuses().iterator().next();
- assertThat(yamlExecuteProcessUnit.getStatus(),
is(processConstants));
- }
- }
-}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/process/GovernanceExecuteProcessReporterSubscriberFixture.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/process/GovernanceExecuteProcessReporterSubscriberFixture.java
deleted file mode 100644
index cdf65353241..00000000000
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/process/GovernanceExecuteProcessReporterSubscriberFixture.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.manager.cluster.process;
-
-import com.google.common.eventbus.Subscribe;
-import lombok.Getter;
-import org.apache.shardingsphere.mode.process.event.ExecuteProcessReportEvent;
-import
org.apache.shardingsphere.mode.process.event.ExecuteProcessSummaryReportEvent;
-import
org.apache.shardingsphere.mode.process.event.ExecuteProcessUnitReportEvent;
-
-@Getter
-public final class GovernanceExecuteProcessReporterSubscriberFixture {
-
- private String value = "";
-
- public GovernanceExecuteProcessReporterSubscriberFixture() {
- EventBusContextHolderFixture.EVENT_BUS_CONTEXT.register(this);
- }
-
- /**
- * Fired on execute process summary report event received.
- *
- * @param executeProcessSummaryReportEvent execute process summary report
event
- */
- @Subscribe
- public void onExecuteProcessSummaryReportEvent(final
ExecuteProcessSummaryReportEvent executeProcessSummaryReportEvent) {
- value =
executeProcessSummaryReportEvent.getExecuteProcessContext().getExecutionID();
- }
-
- /**
- * Fired on execute process unit report event.
- *
- * @param executeProcessUnitReportEvent execute process unit report event
- */
- @Subscribe
- public void onExecuteProcessUnitReportEvent(final
ExecuteProcessUnitReportEvent executeProcessUnitReportEvent) {
- value = executeProcessUnitReportEvent.getExecutionID();
- }
-
- /**
- * Fired on execute process report event.
- *
- * @param executeProcessReportEvent execute process report event
- */
- @Subscribe
- public void onExecuteProcessReportEvent(final ExecuteProcessReportEvent
executeProcessReportEvent) {
- value = executeProcessReportEvent.getExecutionID();
- }
-}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/process/GovernanceExecuteProcessReporterTest.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/process/GovernanceExecuteProcessReporterTest.java
deleted file mode 100644
index b4f393da757..00000000000
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/process/GovernanceExecuteProcessReporterTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.manager.cluster.process;
-
-import org.apache.shardingsphere.infra.binder.LogicSQL;
-import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
-import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessConstants;
-import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessContext;
-import org.junit.Test;
-
-import java.util.UUID;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public final class GovernanceExecuteProcessReporterTest {
-
- @Test
- public void assertReport() {
- GovernanceExecuteProcessReporterSubscriberFixture subscriber = new
GovernanceExecuteProcessReporterSubscriberFixture();
- LogicSQL logicSQL = new LogicSQL(null, null, null);
- ExecutionGroupContext<? extends SQLExecutionUnit>
executionGroupContext = mockExecutionGroupContext();
- ExecuteProcessContext executeProcessContext = new
ExecuteProcessContext(logicSQL.getSql(), executionGroupContext,
ExecuteProcessConstants.EXECUTE_ID);
- GovernanceExecuteProcessReporter reporter = new
GovernanceExecuteProcessReporter();
- reporter.report(logicSQL, executionGroupContext,
ExecuteProcessConstants.EXECUTE_ID,
EventBusContextHolderFixture.EVENT_BUS_CONTEXT);
- assertThat(subscriber.getValue(),
is(executeProcessContext.getExecutionID()));
- }
-
- @SuppressWarnings("unchecked")
- private ExecutionGroupContext<? extends SQLExecutionUnit>
mockExecutionGroupContext() {
- ExecutionGroupContext<? extends SQLExecutionUnit> result =
mock(ExecutionGroupContext.class);
- when(result.getExecutionID()).thenReturn(UUID.randomUUID().toString());
- return result;
- }
-}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilder.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilder.java
index 74cfd4aee61..4a6a88344d1 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilder.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilder.java
@@ -23,8 +23,8 @@ import
org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilder;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
-import
org.apache.shardingsphere.mode.process.subscriber.ProcessRegistrySubscriber;
import
org.apache.shardingsphere.mode.manager.standalone.lock.StandaloneLockContext;
+import
org.apache.shardingsphere.mode.manager.standalone.subscriber.ProcessStandaloneSubscriber;
import
org.apache.shardingsphere.mode.manager.standalone.workerid.generator.StandaloneWorkerIdGenerator;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory;
@@ -47,7 +47,7 @@ public final class StandaloneContextManagerBuilder implements
ContextManagerBuil
MetaDataPersistService persistService = new
MetaDataPersistService(repository);
persistConfigurations(persistService, parameter);
InstanceContext instanceContext = buildInstanceContext(parameter);
- new ProcessRegistrySubscriber(instanceContext.getEventBusContext());
+ new ProcessStandaloneSubscriber(instanceContext.getEventBusContext());
MetaDataContexts metaDataContexts =
MetaDataContextsFactory.create(persistService, parameter.getDatabaseConfigs(),
instanceContext);
return new ContextManager(metaDataContexts, instanceContext);
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/ProcessStandaloneSubscriber.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/ProcessStandaloneSubscriber.java
new file mode 100644
index 00000000000..9e7ef546a5a
--- /dev/null
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/ProcessStandaloneSubscriber.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.standalone.subscriber;
+
+import com.google.common.eventbus.Subscribe;
+import
org.apache.shardingsphere.infra.executor.sql.process.model.yaml.BatchYamlExecuteProcessContext;
+import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.mode.process.ShowProcessListManager;
+import
org.apache.shardingsphere.mode.process.event.ShowProcessListRequestEvent;
+import
org.apache.shardingsphere.mode.process.event.ShowProcessListResponseEvent;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Process standalone subscriber.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public final class ProcessStandaloneSubscriber {
+
+ private final EventBusContext eventBusContext;
+
+ public ProcessStandaloneSubscriber(final EventBusContext eventBusContext) {
+ this.eventBusContext = eventBusContext;
+ eventBusContext.register(this);
+ }
+
+ /**
+ * Load show process list data.
+ *
+ * @param event get children request event.
+ */
+ @Subscribe
+ public void loadShowProcessListData(final ShowProcessListRequestEvent
event) {
+ BatchYamlExecuteProcessContext batchYamlExecuteProcessContext = new
BatchYamlExecuteProcessContext(new ArrayList<>(
+
ShowProcessListManager.getInstance().getProcessContextMap().values()));
+ eventBusContext.post(new
ShowProcessListResponseEvent(batchYamlExecuteProcessContext.getContexts().isEmpty()
+ ? Collections.emptyList()
+ :
Collections.singletonList(YamlEngine.marshal(batchYamlExecuteProcessContext))));
+ }
+}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/process/ProcessStandaloneSubscriberTest.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/process/ProcessStandaloneSubscriberTest.java
new file mode 100644
index 00000000000..00d7c8ec0de
--- /dev/null
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/process/ProcessStandaloneSubscriberTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.standalone.process;
+
+import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
+import
org.apache.shardingsphere.mode.manager.standalone.subscriber.ProcessStandaloneSubscriber;
+import org.apache.shardingsphere.mode.process.ShowProcessListManager;
+import
org.apache.shardingsphere.mode.process.event.ShowProcessListRequestEvent;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.MockedStatic;
+
+import java.util.Collections;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public final class ProcessStandaloneSubscriberTest {
+
+ private final EventBusContext eventBusContext = new EventBusContext();
+
+ private ProcessStandaloneSubscriber processRegistrySubscriber;
+
+ private ShowProcessListManager showProcessListManager;
+
+ private MockedStatic<ShowProcessListManager> mockedStatic;
+
+ @Before
+ public void setUp() {
+ processRegistrySubscriber = new
ProcessStandaloneSubscriber(eventBusContext);
+ mockedStatic = mockStatic(ShowProcessListManager.class);
+ showProcessListManager = mock(ShowProcessListManager.class);
+
mockedStatic.when(ShowProcessListManager::getInstance).thenReturn(showProcessListManager);
+ }
+
+ @Test
+ public void assertLoadShowProcessListData() {
+ ShowProcessListRequestEvent showProcessListRequestEvent =
mock(ShowProcessListRequestEvent.class);
+
when(showProcessListManager.getProcessContextMap()).thenReturn(Collections.emptyMap());
+
processRegistrySubscriber.loadShowProcessListData(showProcessListRequestEvent);
+ verify(showProcessListManager, times(1)).getProcessContextMap();
+ }
+
+ @After
+ public void tearDown() {
+ mockedStatic.close();
+ }
+}