This is an automated email from the ASF dual-hosted git repository.
chengzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 6fdc2cf0bba Add process persist service (#31409)
6fdc2cf0bba is described below
commit 6fdc2cf0bbaac7d1e30807267cbae06e300cd63d
Author: Haoran Meng <[email protected]>
AuthorDate: Mon May 27 09:00:21 2024 +0800
Add process persist service (#31409)
---
...viceBuilder.java => PersistServiceBuilder.java} | 14 ++-
.../mode/service/PersistServiceFacade.java | 6 +-
...viceBuilder.java => ProcessPersistService.java} | 25 +++--
...ture.java => PersistServiceBuilderFixture.java} | 12 ++-
...rdingsphere.mode.service.PersistServiceBuilder} | 2 +-
...lder.java => ClusterPersistServiceBuilder.java} | 14 ++-
.../service/ClusterProcessPersistService.java | 101 +++++++++++++++++++++
...rdingsphere.mode.service.PersistServiceBuilder} | 2 +-
...r.java => StandalonePersistServiceBuilder.java} | 12 ++-
...r.java => StandaloneProcessPersistService.java} | 35 ++++---
...rdingsphere.mode.service.PersistServiceBuilder} | 2 +-
11 files changed, 186 insertions(+), 39 deletions(-)
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/service/MetaDataManagerPersistServiceBuilder.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/service/PersistServiceBuilder.java
similarity index 73%
copy from
mode/core/src/main/java/org/apache/shardingsphere/mode/service/MetaDataManagerPersistServiceBuilder.java
copy to
mode/core/src/main/java/org/apache/shardingsphere/mode/service/PersistServiceBuilder.java
index 9fe6bc5b63b..88452b94008 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/service/MetaDataManagerPersistServiceBuilder.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/service/PersistServiceBuilder.java
@@ -21,9 +21,9 @@ import
org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
import org.apache.shardingsphere.mode.manager.ContextManager;
/**
- * Meta data manager persist service builder.
+ * Persist service builder.
*/
-public interface MetaDataManagerPersistServiceBuilder extends TypedSPI {
+public interface PersistServiceBuilder extends TypedSPI {
/**
* Build meta data manager persist service.
@@ -31,5 +31,13 @@ public interface MetaDataManagerPersistServiceBuilder
extends TypedSPI {
* @param contextManager context manager
* @return meta data manager persist service
*/
- MetaDataManagerPersistService build(ContextManager contextManager);
+ MetaDataManagerPersistService
buildMetaDataManagerPersistService(ContextManager contextManager);
+
+ /**
+ * Build process persist service.
+ *
+ * @param contextManager context manager
+ * @return process persist service
+ */
+ ProcessPersistService buildProcessPersistService(ContextManager
contextManager);
}
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/service/PersistServiceFacade.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/service/PersistServiceFacade.java
index 86153b94280..ab52d3f0239 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/service/PersistServiceFacade.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/service/PersistServiceFacade.java
@@ -40,11 +40,15 @@ public final class PersistServiceFacade {
private final MetaDataManagerPersistService metaDataManagerPersistService;
+ private final ProcessPersistService processPersistService;
+
public PersistServiceFacade(final PersistRepository repository, final
ModeConfiguration modeConfiguration, final ContextManager contextManager) {
metaDataPersistService = new MetaDataPersistService(repository);
computeNodePersistService = new ComputeNodePersistService(repository);
statePersistService = new StatePersistService(repository);
- metaDataManagerPersistService =
TypedSPILoader.getService(MetaDataManagerPersistServiceBuilder.class,
modeConfiguration.getType()).build(contextManager);
+ PersistServiceBuilder persistServiceBuilder =
TypedSPILoader.getService(PersistServiceBuilder.class,
modeConfiguration.getType());
+ metaDataManagerPersistService =
persistServiceBuilder.buildMetaDataManagerPersistService(contextManager);
+ processPersistService =
persistServiceBuilder.buildProcessPersistService(contextManager);
}
/**
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/service/MetaDataManagerPersistServiceBuilder.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/service/ProcessPersistService.java
similarity index 64%
rename from
mode/core/src/main/java/org/apache/shardingsphere/mode/service/MetaDataManagerPersistServiceBuilder.java
rename to
mode/core/src/main/java/org/apache/shardingsphere/mode/service/ProcessPersistService.java
index 9fe6bc5b63b..bb86a970db5 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/service/MetaDataManagerPersistServiceBuilder.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/service/ProcessPersistService.java
@@ -17,19 +17,28 @@
package org.apache.shardingsphere.mode.service;
-import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
-import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.infra.executor.sql.process.Process;
+
+import java.sql.SQLException;
+import java.util.Collection;
/**
- * Meta data manager persist service builder.
+ * Process persist service.
*/
-public interface MetaDataManagerPersistServiceBuilder extends TypedSPI {
+public interface ProcessPersistService {
+
+ /**
+ * Get process list.
+ *
+ * @return collection of process
+ */
+ Collection<Process> getProcessList();
/**
- * Build meta data manager persist service.
+ * Kill process.
*
- * @param contextManager context manager
- * @return meta data manager persist service
+ * @param processId process id
+ * @throws SQLException SQL exception
*/
- MetaDataManagerPersistService build(ContextManager contextManager);
+ void killProcess(String processId) throws SQLException;
}
diff --git
a/mode/core/src/test/java/org/apache/shardingsphere/mode/fixture/MetaDataManagerPersistServiceBuilderFixture.java
b/mode/core/src/test/java/org/apache/shardingsphere/mode/fixture/PersistServiceBuilderFixture.java
similarity index 71%
rename from
mode/core/src/test/java/org/apache/shardingsphere/mode/fixture/MetaDataManagerPersistServiceBuilderFixture.java
rename to
mode/core/src/test/java/org/apache/shardingsphere/mode/fixture/PersistServiceBuilderFixture.java
index 691a9e620d2..52e2e312b2b 100644
---
a/mode/core/src/test/java/org/apache/shardingsphere/mode/fixture/MetaDataManagerPersistServiceBuilderFixture.java
+++
b/mode/core/src/test/java/org/apache/shardingsphere/mode/fixture/PersistServiceBuilderFixture.java
@@ -19,12 +19,18 @@ package org.apache.shardingsphere.mode.fixture;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.service.MetaDataManagerPersistService;
-import
org.apache.shardingsphere.mode.service.MetaDataManagerPersistServiceBuilder;
+import org.apache.shardingsphere.mode.service.PersistServiceBuilder;
+import org.apache.shardingsphere.mode.service.ProcessPersistService;
-public final class MetaDataManagerPersistServiceBuilderFixture implements
MetaDataManagerPersistServiceBuilder {
+public final class PersistServiceBuilderFixture implements
PersistServiceBuilder {
@Override
- public MetaDataManagerPersistService build(final ContextManager
contextManager) {
+ public MetaDataManagerPersistService
buildMetaDataManagerPersistService(final ContextManager contextManager) {
+ return null;
+ }
+
+ @Override
+ public ProcessPersistService buildProcessPersistService(final
ContextManager contextManager) {
return null;
}
diff --git
a/mode/core/src/test/resources/META-INF/services/org.apache.shardingsphere.mode.service.MetaDataManagerPersistServiceBuilder
b/mode/core/src/test/resources/META-INF/services/org.apache.shardingsphere.mode.service.PersistServiceBuilder
similarity index 90%
rename from
mode/core/src/test/resources/META-INF/services/org.apache.shardingsphere.mode.service.MetaDataManagerPersistServiceBuilder
rename to
mode/core/src/test/resources/META-INF/services/org.apache.shardingsphere.mode.service.PersistServiceBuilder
index 02002f9acbe..43ab8da1766 100644
---
a/mode/core/src/test/resources/META-INF/services/org.apache.shardingsphere.mode.service.MetaDataManagerPersistServiceBuilder
+++
b/mode/core/src/test/resources/META-INF/services/org.apache.shardingsphere.mode.service.PersistServiceBuilder
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.mode.fixture.MetaDataManagerPersistServiceBuilderFixture
+org.apache.shardingsphere.mode.fixture.PersistServiceBuilderFixture
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/service/ClusterMetaDataManagerPersistServiceBuilder.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/service/ClusterPersistServiceBuilder.java
similarity index 67%
rename from
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/service/ClusterMetaDataManagerPersistServiceBuilder.java
rename to
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/service/ClusterPersistServiceBuilder.java
index 1f5eb8a41d2..af5403bb86f 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/service/ClusterMetaDataManagerPersistServiceBuilder.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/service/ClusterPersistServiceBuilder.java
@@ -19,18 +19,24 @@ package
org.apache.shardingsphere.mode.manager.cluster.service;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.service.MetaDataManagerPersistService;
-import
org.apache.shardingsphere.mode.service.MetaDataManagerPersistServiceBuilder;
+import org.apache.shardingsphere.mode.service.PersistServiceBuilder;
+import org.apache.shardingsphere.mode.service.ProcessPersistService;
/**
- * Cluster meta data manager persist service builder.
+ * Cluster persist service builder.
*/
-public final class ClusterMetaDataManagerPersistServiceBuilder implements
MetaDataManagerPersistServiceBuilder {
+public final class ClusterPersistServiceBuilder implements
PersistServiceBuilder {
@Override
- public MetaDataManagerPersistService build(final ContextManager
contextManager) {
+ public MetaDataManagerPersistService
buildMetaDataManagerPersistService(final ContextManager contextManager) {
return new ClusterMetaDataManagerPersistService(contextManager);
}
+ @Override
+ public ProcessPersistService buildProcessPersistService(final
ContextManager contextManager) {
+ return new
ClusterProcessPersistService(contextManager.getRepository());
+ }
+
@Override
public Object getType() {
return "Cluster";
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/service/ClusterProcessPersistService.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/service/ClusterProcessPersistService.java
new file mode 100644
index 00000000000..634dea0fda4
--- /dev/null
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/service/ClusterProcessPersistService.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.service;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.executor.sql.process.Process;
+import
org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
+import
org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessList;
+import
org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
+import org.apache.shardingsphere.metadata.persist.node.ProcessNode;
+import org.apache.shardingsphere.mode.service.ProcessPersistService;
+import org.apache.shardingsphere.mode.spi.PersistRepository;
+
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Cluster process persist service.
+ */
+@RequiredArgsConstructor
+public final class ClusterProcessPersistService implements
ProcessPersistService {
+
+ private final PersistRepository repository;
+
+ @Override
+ public Collection<Process> getProcessList() {
+ String taskId = new UUID(ThreadLocalRandom.current().nextLong(),
ThreadLocalRandom.current().nextLong()).toString().replace("-", "");
+ Collection<String> triggerPaths =
getShowProcessListTriggerPaths(taskId);
+ boolean isCompleted = false;
+ try {
+ triggerPaths.forEach(each -> repository.persist(each, ""));
+ isCompleted =
ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(taskId, () ->
isReady(triggerPaths));
+ return getShowProcessListData(taskId);
+ } finally {
+ repository.delete(ProcessNode.getProcessIdPath(taskId));
+ if (!isCompleted) {
+ triggerPaths.forEach(repository::delete);
+ }
+ }
+ }
+
+ private Collection<Process> getShowProcessListData(final String taskId) {
+ YamlProcessList yamlProcessList = new YamlProcessList();
+ for (String each :
repository.getChildrenKeys(ProcessNode.getProcessIdPath(taskId)).stream()
+ .map(each ->
repository.query(ProcessNode.getProcessListInstancePath(taskId,
each))).collect(Collectors.toList())) {
+ yamlProcessList.getProcesses().addAll(YamlEngine.unmarshal(each,
YamlProcessList.class).getProcesses());
+ }
+ return new YamlProcessListSwapper().swapToObject(yamlProcessList);
+ }
+
+ private Collection<String> getShowProcessListTriggerPaths(final String
taskId) {
+ return Stream.of(InstanceType.values())
+ .flatMap(each ->
repository.getChildrenKeys(ComputeNode.getOnlineNodePath(each)).stream().map(onlinePath
-> ComputeNode.getProcessTriggerInstanceNodePath(onlinePath, taskId)))
+ .collect(Collectors.toList());
+ }
+
+ private boolean isReady(final Collection<String> paths) {
+ return paths.stream().noneMatch(each -> null !=
repository.query(each));
+ }
+
+ @Override
+ public void killProcess(final String processId) {
+ Collection<String> triggerPaths =
getKillProcessTriggerPaths(processId);
+ boolean isCompleted = false;
+ try {
+ triggerPaths.forEach(each -> repository.persist(each, ""));
+ isCompleted =
ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(processId, ()
-> isReady(triggerPaths));
+ } finally {
+ if (!isCompleted) {
+ triggerPaths.forEach(repository::delete);
+ }
+ }
+ }
+
+ private Collection<String> getKillProcessTriggerPaths(final String
processId) {
+ return Stream.of(InstanceType.values())
+ .flatMap(each ->
repository.getChildrenKeys(ComputeNode.getOnlineNodePath(each)).stream().map(onlinePath
-> ComputeNode.getProcessKillInstanceIdNodePath(onlinePath, processId)))
+ .collect(Collectors.toList());
+ }
+}
diff --git
a/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.service.MetaDataManagerPersistServiceBuilder
b/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.service.PersistServiceBuilder
similarity index 88%
rename from
mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.service.MetaDataManagerPersistServiceBuilder
rename to
mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.service.PersistServiceBuilder
index 9e31a2d93c2..dbde1834e62 100644
---
a/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.service.MetaDataManagerPersistServiceBuilder
+++
b/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.service.PersistServiceBuilder
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.mode.manager.cluster.service.ClusterMetaDataManagerPersistServiceBuilder
+org.apache.shardingsphere.mode.manager.cluster.service.ClusterPersistServiceBuilder
diff --git
a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/service/StandaloneMetaDataManagerPersistServiceBuilder.java
b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/service/StandalonePersistServiceBuilder.java
similarity index 72%
copy from
mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/service/StandaloneMetaDataManagerPersistServiceBuilder.java
copy to
mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/service/StandalonePersistServiceBuilder.java
index c60f8658c39..1297944ddef 100644
---
a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/service/StandaloneMetaDataManagerPersistServiceBuilder.java
+++
b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/service/StandalonePersistServiceBuilder.java
@@ -19,18 +19,24 @@ package
org.apache.shardingsphere.mode.manager.standalone.service;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.service.MetaDataManagerPersistService;
-import
org.apache.shardingsphere.mode.service.MetaDataManagerPersistServiceBuilder;
+import org.apache.shardingsphere.mode.service.PersistServiceBuilder;
+import org.apache.shardingsphere.mode.service.ProcessPersistService;
/**
* Standalone meta data manager persist service builder.
*/
-public final class StandaloneMetaDataManagerPersistServiceBuilder implements
MetaDataManagerPersistServiceBuilder {
+public final class StandalonePersistServiceBuilder implements
PersistServiceBuilder {
@Override
- public MetaDataManagerPersistService build(final ContextManager
contextManager) {
+ public MetaDataManagerPersistService
buildMetaDataManagerPersistService(final ContextManager contextManager) {
return new StandaloneMetaDataManagerPersistService(contextManager);
}
+ @Override
+ public ProcessPersistService buildProcessPersistService(final
ContextManager contextManager) {
+ return new StandaloneProcessPersistService();
+ }
+
@Override
public Object getType() {
return "Standalone";
diff --git
a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/service/StandaloneMetaDataManagerPersistServiceBuilder.java
b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/service/StandaloneProcessPersistService.java
similarity index 50%
rename from
mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/service/StandaloneMetaDataManagerPersistServiceBuilder.java
rename to
mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/service/StandaloneProcessPersistService.java
index c60f8658c39..4f09169f4d7 100644
---
a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/service/StandaloneMetaDataManagerPersistServiceBuilder.java
+++
b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/service/StandaloneProcessPersistService.java
@@ -17,27 +17,34 @@
package org.apache.shardingsphere.mode.manager.standalone.service;
-import org.apache.shardingsphere.mode.manager.ContextManager;
-import org.apache.shardingsphere.mode.service.MetaDataManagerPersistService;
-import
org.apache.shardingsphere.mode.service.MetaDataManagerPersistServiceBuilder;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.executor.sql.process.Process;
+import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
+import org.apache.shardingsphere.mode.service.ProcessPersistService;
+
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collection;
/**
- * Standalone meta data manager persist service builder.
+ * Standalone process persist service.
*/
-public final class StandaloneMetaDataManagerPersistServiceBuilder implements
MetaDataManagerPersistServiceBuilder {
-
- @Override
- public MetaDataManagerPersistService build(final ContextManager
contextManager) {
- return new StandaloneMetaDataManagerPersistService(contextManager);
- }
+@RequiredArgsConstructor
+public final class StandaloneProcessPersistService implements
ProcessPersistService {
@Override
- public Object getType() {
- return "Standalone";
+ public Collection<Process> getProcessList() {
+ return ProcessRegistry.getInstance().listAll();
}
@Override
- public boolean isDefault() {
- return true;
+ public void killProcess(final String processId) throws SQLException {
+ Process process = ProcessRegistry.getInstance().get(processId);
+ if (null == process) {
+ return;
+ }
+ for (Statement each : process.getProcessStatements().values()) {
+ each.cancel();
+ }
}
}
diff --git
a/mode/type/standalone/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.service.MetaDataManagerPersistServiceBuilder
b/mode/type/standalone/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.service.PersistServiceBuilder
similarity index 95%
rename from
mode/type/standalone/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.service.MetaDataManagerPersistServiceBuilder
rename to
mode/type/standalone/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.service.PersistServiceBuilder
index 3dbb1a3829c..95cd8916593 100644
---
a/mode/type/standalone/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.service.MetaDataManagerPersistServiceBuilder
+++
b/mode/type/standalone/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.service.PersistServiceBuilder
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.mode.manager.standalone.service.StandaloneMetaDataManagerPersistServiceBuilder
+org.apache.shardingsphere.mode.manager.standalone.service.StandalonePersistServiceBuilder