This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 427ec70 Add ProcessRegistrySubscriber (#10463)
427ec70 is described below
commit 427ec70e8ee47d3cdf857be0eddfcc62251d97f6
Author: Liang Zhang <[email protected]>
AuthorDate: Tue May 25 19:11:03 2021 +0800
Add ProcessRegistrySubscriber (#10463)
* Inline RuleChangedListener
* Rename ScalingRegistryService
* Rename ScalingRegistryService
* Add ProcessRegistrySubscriber
---
.../governance/core/registry/RegistryCenter.java | 91 +++--------------
.../service/process/ProcessRegistrySubscriber.java | 113 +++++++++++++++++++++
...Service.java => ScalingRegistrySubscriber.java} | 6 +-
...est.java => ScalingRegistrySubscriberTest.java} | 13 +--
4 files changed, 135 insertions(+), 88 deletions(-)
diff --git
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
index cad9f5c..7522154 100644
---
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
+++
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
@@ -24,15 +24,12 @@ import com.google.common.eventbus.Subscribe;
import lombok.Getter;
import
org.apache.shardingsphere.authority.api.config.AuthorityRuleConfiguration;
import
org.apache.shardingsphere.governance.core.registry.instance.GovernanceInstance;
-import
org.apache.shardingsphere.governance.core.registry.listener.event.invocation.ExecuteProcessReportEvent;
-import
org.apache.shardingsphere.governance.core.registry.listener.event.invocation.ExecuteProcessSummaryReportEvent;
-import
org.apache.shardingsphere.governance.core.registry.listener.event.invocation.ExecuteProcessUnitReportEvent;
-import
org.apache.shardingsphere.governance.core.registry.listener.event.invocation.ShowProcessListRequestEvent;
-import
org.apache.shardingsphere.governance.core.registry.listener.event.invocation.ShowProcessListResponseEvent;
import
org.apache.shardingsphere.governance.core.registry.service.config.impl.DataSourceRegistryService;
import
org.apache.shardingsphere.governance.core.registry.service.config.impl.GlobalRuleRegistryService;
import
org.apache.shardingsphere.governance.core.registry.service.config.impl.PropertiesRegistryService;
import
org.apache.shardingsphere.governance.core.registry.service.config.impl.SchemaRuleRegistryService;
+import
org.apache.shardingsphere.governance.core.registry.service.process.ProcessRegistrySubscriber;
+import
org.apache.shardingsphere.governance.core.registry.service.scaling.ScalingRegistrySubscriber;
import
org.apache.shardingsphere.governance.core.registry.service.schema.SchemaRegistryService;
import
org.apache.shardingsphere.governance.core.registry.service.state.DataSourceStatusRegistryService;
import
org.apache.shardingsphere.governance.core.registry.service.state.LockRegistryService;
@@ -40,11 +37,6 @@ import
org.apache.shardingsphere.governance.repository.spi.RegistryCenterReposit
import org.apache.shardingsphere.infra.config.RuleConfiguration;
import
org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
-import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessConstants;
-import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessContext;
-import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessUnit;
-import
org.apache.shardingsphere.infra.executor.sql.process.model.yaml.YamlExecuteProcessContext;
-import
org.apache.shardingsphere.infra.executor.sql.process.model.yaml.YamlExecuteProcessUnit;
import
org.apache.shardingsphere.infra.metadata.mapper.event.dcl.impl.CreateUserStatementEvent;
import
org.apache.shardingsphere.infra.metadata.mapper.event.dcl.impl.GrantStatementEvent;
import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
@@ -55,12 +47,10 @@ import
org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Properties;
-import java.util.stream.Collectors;
/**
* Registry center.
@@ -105,10 +95,20 @@ public final class RegistryCenter {
schemaService = new SchemaRegistryService(repository);
dataSourceStatusService = new
DataSourceStatusRegistryService(repository);
lockService = new LockRegistryService(repository);
+ new ScalingRegistrySubscriber(repository, schemaRuleService);
+ new ProcessRegistrySubscriber(repository);
ShardingSphereEventBus.getInstance().register(this);
}
/**
+ * Initialize nodes.
+ */
+ public void initNodes() {
+ repository.persist(node.getDataNodesPath(), "");
+ repository.persist(node.getPrimaryNodesPath(), "");
+ }
+
+ /**
* Persist configurations.
*
* @param dataSourceConfigs schema and data source configuration map
@@ -190,76 +190,9 @@ public final class RegistryCenter {
}
/**
- * Load show process list data.
- *
- * @param event get children request event.
- */
- @Subscribe
- public void loadShowProcessListData(final ShowProcessListRequestEvent
event) {
- List<String> childrenKeys =
repository.getChildrenKeys(node.getExecutionNodesPath());
- Collection<String> processListData = childrenKeys.stream().map(key ->
repository.get(node.getExecutionPath(key))).collect(Collectors.toList());
- ShardingSphereEventBus.getInstance().post(new
ShowProcessListResponseEvent(processListData));
- }
-
- /**
- * Report execute process summary.
- *
- * @param event execute process summary report event.
- */
- @Subscribe
- public void reportExecuteProcessSummary(final
ExecuteProcessSummaryReportEvent event) {
- ExecuteProcessContext executeProcessContext =
event.getExecuteProcessContext();
-
repository.persist(node.getExecutionPath(executeProcessContext.getExecutionID()),
YamlEngine.marshal(new YamlExecuteProcessContext(executeProcessContext)));
- }
-
- /**
- * Report execute process unit.
- *
- * @param event execute process unit report event.
- */
- @Subscribe
- public void reportExecuteProcessUnit(final ExecuteProcessUnitReportEvent
event) {
- // TODO lock on the same jvm
- String executionPath = node.getExecutionPath(event.getExecutionID());
- YamlExecuteProcessContext yamlExecuteProcessContext =
YamlEngine.unmarshal(repository.get(executionPath),
YamlExecuteProcessContext.class);
- ExecuteProcessUnit executeProcessUnit = event.getExecuteProcessUnit();
- for (YamlExecuteProcessUnit unit :
yamlExecuteProcessContext.getUnitStatuses()) {
- if (unit.getUnitID().equals(executeProcessUnit.getUnitID())) {
- unit.setStatus(executeProcessUnit.getStatus());
- }
- }
- repository.persist(executionPath,
YamlEngine.marshal(yamlExecuteProcessContext));
- }
-
- /**
- * Report execute process.
- *
- * @param event execute process report event.
- */
- @Subscribe
- public void reportExecuteProcess(final ExecuteProcessReportEvent event) {
- String executionPath = node.getExecutionPath(event.getExecutionID());
- YamlExecuteProcessContext yamlExecuteProcessContext =
YamlEngine.unmarshal(repository.get(executionPath),
YamlExecuteProcessContext.class);
- for (YamlExecuteProcessUnit unit :
yamlExecuteProcessContext.getUnitStatuses()) {
- if (unit.getStatus() !=
ExecuteProcessConstants.EXECUTE_STATUS_DONE) {
- return;
- }
- }
- repository.delete(executionPath);
- }
-
- /**
* Register instance online.
*/
public void registerInstanceOnline() {
repository.persistEphemeral(node.getProxyNodePath(instanceId), "");
}
-
- /**
- * Initialize nodes.
- */
- public void initNodes() {
- repository.persist(node.getDataNodesPath(), "");
- repository.persist(node.getPrimaryNodesPath(), "");
- }
}
diff --git
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/service/process/ProcessRegistrySubscriber.java
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/service/process/ProcessRegistrySubscriber.java
new file mode 100644
index 0000000..d43630b
--- /dev/null
+++
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/service/process/ProcessRegistrySubscriber.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.governance.core.registry.service.process;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.shardingsphere.governance.core.registry.RegistryCenterNode;
+import
org.apache.shardingsphere.governance.core.registry.listener.event.invocation.ExecuteProcessReportEvent;
+import
org.apache.shardingsphere.governance.core.registry.listener.event.invocation.ExecuteProcessSummaryReportEvent;
+import
org.apache.shardingsphere.governance.core.registry.listener.event.invocation.ExecuteProcessUnitReportEvent;
+import
org.apache.shardingsphere.governance.core.registry.listener.event.invocation.ShowProcessListRequestEvent;
+import
org.apache.shardingsphere.governance.core.registry.listener.event.invocation.ShowProcessListResponseEvent;
+import
org.apache.shardingsphere.governance.repository.spi.RegistryCenterRepository;
+import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
+import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessConstants;
+import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessContext;
+import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessUnit;
+import
org.apache.shardingsphere.infra.executor.sql.process.model.yaml.YamlExecuteProcessContext;
+import
org.apache.shardingsphere.infra.executor.sql.process.model.yaml.YamlExecuteProcessUnit;
+import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Process registry subscriber.
+ */
+public final class ProcessRegistrySubscriber {
+
+ private final RegistryCenterRepository repository;
+
+ private final RegistryCenterNode node;
+
+ public ProcessRegistrySubscriber(final RegistryCenterRepository
repository) {
+ this.repository = repository;
+ node = new RegistryCenterNode();
+ ShardingSphereEventBus.getInstance().register(this);
+ }
+
+ /**
+ * Load show process list data.
+ *
+ * @param event get children request event.
+ */
+ @Subscribe
+ public void loadShowProcessListData(final ShowProcessListRequestEvent
event) {
+ List<String> childrenKeys =
repository.getChildrenKeys(node.getExecutionNodesPath());
+ Collection<String> processListData = childrenKeys.stream().map(key ->
repository.get(node.getExecutionPath(key))).collect(Collectors.toList());
+ ShardingSphereEventBus.getInstance().post(new
ShowProcessListResponseEvent(processListData));
+ }
+
+ /**
+ * Report execute process summary.
+ *
+ * @param event execute process summary report event.
+ */
+ @Subscribe
+ public void reportExecuteProcessSummary(final
ExecuteProcessSummaryReportEvent event) {
+ ExecuteProcessContext executeProcessContext =
event.getExecuteProcessContext();
+
repository.persist(node.getExecutionPath(executeProcessContext.getExecutionID()),
YamlEngine.marshal(new YamlExecuteProcessContext(executeProcessContext)));
+ }
+
+ /**
+ * Report execute process unit.
+ *
+ * @param event execute process unit report event.
+ */
+ @Subscribe
+ public void reportExecuteProcessUnit(final ExecuteProcessUnitReportEvent
event) {
+ // TODO lock on the same jvm
+ String executionPath = node.getExecutionPath(event.getExecutionID());
+ YamlExecuteProcessContext yamlExecuteProcessContext =
YamlEngine.unmarshal(repository.get(executionPath),
YamlExecuteProcessContext.class);
+ ExecuteProcessUnit executeProcessUnit = event.getExecuteProcessUnit();
+ for (YamlExecuteProcessUnit unit :
yamlExecuteProcessContext.getUnitStatuses()) {
+ if (unit.getUnitID().equals(executeProcessUnit.getUnitID())) {
+ unit.setStatus(executeProcessUnit.getStatus());
+ }
+ }
+ repository.persist(executionPath,
YamlEngine.marshal(yamlExecuteProcessContext));
+ }
+
+ /**
+ * Report execute process.
+ *
+ * @param event execute process report event.
+ */
+ @Subscribe
+ public void reportExecuteProcess(final ExecuteProcessReportEvent event) {
+ String executionPath = node.getExecutionPath(event.getExecutionID());
+ YamlExecuteProcessContext yamlExecuteProcessContext =
YamlEngine.unmarshal(repository.get(executionPath),
YamlExecuteProcessContext.class);
+ for (YamlExecuteProcessUnit unit :
yamlExecuteProcessContext.getUnitStatuses()) {
+ if (unit.getStatus() !=
ExecuteProcessConstants.EXECUTE_STATUS_DONE) {
+ return;
+ }
+ }
+ repository.delete(executionPath);
+ }
+}
diff --git
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/service/scaling/ScalingRegistryService.java
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/service/scaling/ScalingRegistrySubscriber.java
similarity index 94%
rename from
shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/service/scaling/ScalingRegistryService.java
rename to
shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/service/scaling/ScalingRegistrySubscriber.java
index e2f31bb..157cb0a 100644
---
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/service/scaling/ScalingRegistryService.java
+++
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/service/scaling/ScalingRegistrySubscriber.java
@@ -33,10 +33,10 @@ import
org.apache.shardingsphere.infra.yaml.swapper.YamlRuleConfigurationSwapper
import java.util.Collection;
/**
- * Scaling registry service.
+ * Scaling registry subscriber.
*/
// TODO move to scaling module
-public final class ScalingRegistryService {
+public final class ScalingRegistrySubscriber {
private final RegistryCenterRepository repository;
@@ -46,7 +46,7 @@ public final class ScalingRegistryService {
private final RegistryCacheManager registryCacheManager;
- public ScalingRegistryService(final RegistryCenterRepository repository,
final SchemaRuleRegistryService schemaRuleService) {
+ public ScalingRegistrySubscriber(final RegistryCenterRepository
repository, final SchemaRuleRegistryService schemaRuleService) {
this.repository = repository;
node = new RegistryCenterNode();
this.schemaRuleService = schemaRuleService;
diff --git
a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/service/scaling/ScalingRegistryServiceTest.java
b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/service/scaling/ScalingRegistrySubscriberTest.java
similarity index 86%
rename from
shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/service/scaling/ScalingRegistryServiceTest.java
rename to
shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/service/scaling/ScalingRegistrySubscriberTest.java
index 21c00fc..d5132a6 100644
---
a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/service/scaling/ScalingRegistryServiceTest.java
+++
b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/service/scaling/ScalingRegistrySubscriberTest.java
@@ -40,7 +40,7 @@ import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
-public final class ScalingRegistryServiceTest {
+public final class ScalingRegistrySubscriberTest {
@Mock
private RegistryCenterRepository registryCenterRepository;
@@ -51,24 +51,25 @@ public final class ScalingRegistryServiceTest {
@Mock
private RegistryCacheManager registryCacheManager;
- private ScalingRegistryService scalingRegistryService;
+ private ScalingRegistrySubscriber scalingRegistrySubscriber;
@Before
public void setUp() throws ReflectiveOperationException {
- scalingRegistryService = new
ScalingRegistryService(registryCenterRepository, schemaRuleService);
+ scalingRegistrySubscriber = new
ScalingRegistrySubscriber(registryCenterRepository, schemaRuleService);
}
@Test
public void assertSwitchRuleConfiguration() throws
ReflectiveOperationException {
- Field field =
ScalingRegistryService.class.getDeclaredField("registryCacheManager");
+ Field field =
ScalingRegistrySubscriber.class.getDeclaredField("registryCacheManager");
field.setAccessible(true);
- field.set(scalingRegistryService, registryCacheManager);
+ field.set(scalingRegistrySubscriber, registryCacheManager);
when(registryCacheManager.loadCache(anyString(),
eq("testCacheId"))).thenReturn(readYAML());
SwitchRuleConfigurationEvent event = new
SwitchRuleConfigurationEvent("sharding_db", "testCacheId");
- scalingRegistryService.switchRuleConfiguration(event);
+ scalingRegistrySubscriber.switchRuleConfiguration(event);
// TODO finish verify
}
+ @Test
public void assertCacheRuleConfiguration() {
// TODO finish test case
}