This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 427ec70  Add ProcessRegistrySubscriber (#10463)
427ec70 is described below

commit 427ec70e8ee47d3cdf857be0eddfcc62251d97f6
Author: Liang Zhang <[email protected]>
AuthorDate: Tue May 25 19:11:03 2021 +0800

    Add ProcessRegistrySubscriber (#10463)
    
    * Inline RuleChangedListener
    
    * Rename ScalingRegistryService
    
    * Rename ScalingRegistryService
    
    * Add ProcessRegistrySubscriber
---
 .../governance/core/registry/RegistryCenter.java   |  91 +++--------------
 .../service/process/ProcessRegistrySubscriber.java | 113 +++++++++++++++++++++
 ...Service.java => ScalingRegistrySubscriber.java} |   6 +-
 ...est.java => ScalingRegistrySubscriberTest.java} |  13 +--
 4 files changed, 135 insertions(+), 88 deletions(-)

diff --git 
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
 
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
index cad9f5c..7522154 100644
--- 
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
+++ 
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
@@ -24,15 +24,12 @@ import com.google.common.eventbus.Subscribe;
 import lombok.Getter;
 import 
org.apache.shardingsphere.authority.api.config.AuthorityRuleConfiguration;
 import 
org.apache.shardingsphere.governance.core.registry.instance.GovernanceInstance;
-import 
org.apache.shardingsphere.governance.core.registry.listener.event.invocation.ExecuteProcessReportEvent;
-import 
org.apache.shardingsphere.governance.core.registry.listener.event.invocation.ExecuteProcessSummaryReportEvent;
-import 
org.apache.shardingsphere.governance.core.registry.listener.event.invocation.ExecuteProcessUnitReportEvent;
-import 
org.apache.shardingsphere.governance.core.registry.listener.event.invocation.ShowProcessListRequestEvent;
-import 
org.apache.shardingsphere.governance.core.registry.listener.event.invocation.ShowProcessListResponseEvent;
 import 
org.apache.shardingsphere.governance.core.registry.service.config.impl.DataSourceRegistryService;
 import 
org.apache.shardingsphere.governance.core.registry.service.config.impl.GlobalRuleRegistryService;
 import 
org.apache.shardingsphere.governance.core.registry.service.config.impl.PropertiesRegistryService;
 import 
org.apache.shardingsphere.governance.core.registry.service.config.impl.SchemaRuleRegistryService;
+import 
org.apache.shardingsphere.governance.core.registry.service.process.ProcessRegistrySubscriber;
+import 
org.apache.shardingsphere.governance.core.registry.service.scaling.ScalingRegistrySubscriber;
 import 
org.apache.shardingsphere.governance.core.registry.service.schema.SchemaRegistryService;
 import 
org.apache.shardingsphere.governance.core.registry.service.state.DataSourceStatusRegistryService;
 import 
org.apache.shardingsphere.governance.core.registry.service.state.LockRegistryService;
@@ -40,11 +37,6 @@ import 
org.apache.shardingsphere.governance.repository.spi.RegistryCenterReposit
 import org.apache.shardingsphere.infra.config.RuleConfiguration;
 import 
org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
 import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
-import 
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessConstants;
-import 
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessContext;
-import 
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessUnit;
-import 
org.apache.shardingsphere.infra.executor.sql.process.model.yaml.YamlExecuteProcessContext;
-import 
org.apache.shardingsphere.infra.executor.sql.process.model.yaml.YamlExecuteProcessUnit;
 import 
org.apache.shardingsphere.infra.metadata.mapper.event.dcl.impl.CreateUserStatementEvent;
 import 
org.apache.shardingsphere.infra.metadata.mapper.event.dcl.impl.GrantStatementEvent;
 import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
@@ -55,12 +47,10 @@ import 
org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Properties;
-import java.util.stream.Collectors;
 
 /**
  * Registry center.
@@ -105,10 +95,20 @@ public final class RegistryCenter {
         schemaService = new SchemaRegistryService(repository);
         dataSourceStatusService = new 
DataSourceStatusRegistryService(repository);
         lockService = new LockRegistryService(repository);
+        new ScalingRegistrySubscriber(repository, schemaRuleService);
+        new ProcessRegistrySubscriber(repository);
         ShardingSphereEventBus.getInstance().register(this);
     }
     
     /**
+     * Initialize nodes.
+     */
+    public void initNodes() {
+        repository.persist(node.getDataNodesPath(), "");
+        repository.persist(node.getPrimaryNodesPath(), "");
+    }
+    
+    /**
      * Persist configurations.
      *
      * @param dataSourceConfigs schema and data source configuration map
@@ -190,76 +190,9 @@ public final class RegistryCenter {
     }
     
     /**
-     * Load show process list data.
-     *
-     * @param event get children request event.
-     */
-    @Subscribe
-    public void loadShowProcessListData(final ShowProcessListRequestEvent 
event) {
-        List<String> childrenKeys = 
repository.getChildrenKeys(node.getExecutionNodesPath());
-        Collection<String> processListData = childrenKeys.stream().map(key -> 
repository.get(node.getExecutionPath(key))).collect(Collectors.toList());
-        ShardingSphereEventBus.getInstance().post(new 
ShowProcessListResponseEvent(processListData));
-    }
-    
-    /**
-     * Report execute process summary.
-     *
-     * @param event execute process summary report event.
-     */
-    @Subscribe
-    public void reportExecuteProcessSummary(final 
ExecuteProcessSummaryReportEvent event) {
-        ExecuteProcessContext executeProcessContext = 
event.getExecuteProcessContext();
-        
repository.persist(node.getExecutionPath(executeProcessContext.getExecutionID()),
 YamlEngine.marshal(new YamlExecuteProcessContext(executeProcessContext)));
-    }
-    
-    /**
-     * Report execute process unit.
-     *
-     * @param event execute process unit report event.
-     */
-    @Subscribe
-    public void reportExecuteProcessUnit(final ExecuteProcessUnitReportEvent 
event) {
-        // TODO lock on the same jvm
-        String executionPath = node.getExecutionPath(event.getExecutionID());
-        YamlExecuteProcessContext yamlExecuteProcessContext = 
YamlEngine.unmarshal(repository.get(executionPath), 
YamlExecuteProcessContext.class);
-        ExecuteProcessUnit executeProcessUnit = event.getExecuteProcessUnit();
-        for (YamlExecuteProcessUnit unit : 
yamlExecuteProcessContext.getUnitStatuses()) {
-            if (unit.getUnitID().equals(executeProcessUnit.getUnitID())) {
-                unit.setStatus(executeProcessUnit.getStatus());
-            }
-        }
-        repository.persist(executionPath, 
YamlEngine.marshal(yamlExecuteProcessContext));
-    }
-    
-    /**
-     * Report execute process.
-     *
-     * @param event execute process report event.
-     */
-    @Subscribe
-    public void reportExecuteProcess(final ExecuteProcessReportEvent event) {
-        String executionPath = node.getExecutionPath(event.getExecutionID());
-        YamlExecuteProcessContext yamlExecuteProcessContext = 
YamlEngine.unmarshal(repository.get(executionPath), 
YamlExecuteProcessContext.class);
-        for (YamlExecuteProcessUnit unit : 
yamlExecuteProcessContext.getUnitStatuses()) {
-            if (unit.getStatus() != 
ExecuteProcessConstants.EXECUTE_STATUS_DONE) {
-                return;
-            }
-        }
-        repository.delete(executionPath);
-    }
-    
-    /**
      * Register instance online.
      */
     public void registerInstanceOnline() {
         repository.persistEphemeral(node.getProxyNodePath(instanceId), "");
     }
-    
-    /**
-     * Initialize nodes.
-     */
-    public void initNodes() {
-        repository.persist(node.getDataNodesPath(), "");
-        repository.persist(node.getPrimaryNodesPath(), "");
-    }
 }
diff --git 
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/service/process/ProcessRegistrySubscriber.java
 
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/service/process/ProcessRegistrySubscriber.java
new file mode 100644
index 0000000..d43630b
--- /dev/null
+++ 
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/service/process/ProcessRegistrySubscriber.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.governance.core.registry.service.process;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.shardingsphere.governance.core.registry.RegistryCenterNode;
+import 
org.apache.shardingsphere.governance.core.registry.listener.event.invocation.ExecuteProcessReportEvent;
+import 
org.apache.shardingsphere.governance.core.registry.listener.event.invocation.ExecuteProcessSummaryReportEvent;
+import 
org.apache.shardingsphere.governance.core.registry.listener.event.invocation.ExecuteProcessUnitReportEvent;
+import 
org.apache.shardingsphere.governance.core.registry.listener.event.invocation.ShowProcessListRequestEvent;
+import 
org.apache.shardingsphere.governance.core.registry.listener.event.invocation.ShowProcessListResponseEvent;
+import 
org.apache.shardingsphere.governance.repository.spi.RegistryCenterRepository;
+import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
+import 
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessConstants;
+import 
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessContext;
+import 
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessUnit;
+import 
org.apache.shardingsphere.infra.executor.sql.process.model.yaml.YamlExecuteProcessContext;
+import 
org.apache.shardingsphere.infra.executor.sql.process.model.yaml.YamlExecuteProcessUnit;
+import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Process registry subscriber.
+ */
+public final class ProcessRegistrySubscriber {
+    
+    private final RegistryCenterRepository repository;
+    
+    private final RegistryCenterNode node;
+    
+    public ProcessRegistrySubscriber(final RegistryCenterRepository 
repository) {
+        this.repository = repository;
+        node = new RegistryCenterNode();
+        ShardingSphereEventBus.getInstance().register(this);
+    }
+    
+    /**
+     * Load show process list data.
+     *
+     * @param event get children request event.
+     */
+    @Subscribe
+    public void loadShowProcessListData(final ShowProcessListRequestEvent 
event) {
+        List<String> childrenKeys = 
repository.getChildrenKeys(node.getExecutionNodesPath());
+        Collection<String> processListData = childrenKeys.stream().map(key -> 
repository.get(node.getExecutionPath(key))).collect(Collectors.toList());
+        ShardingSphereEventBus.getInstance().post(new 
ShowProcessListResponseEvent(processListData));
+    }
+    
+    /**
+     * Report execute process summary.
+     *
+     * @param event execute process summary report event.
+     */
+    @Subscribe
+    public void reportExecuteProcessSummary(final 
ExecuteProcessSummaryReportEvent event) {
+        ExecuteProcessContext executeProcessContext = 
event.getExecuteProcessContext();
+        
repository.persist(node.getExecutionPath(executeProcessContext.getExecutionID()),
 YamlEngine.marshal(new YamlExecuteProcessContext(executeProcessContext)));
+    }
+    
+    /**
+     * Report execute process unit.
+     *
+     * @param event execute process unit report event.
+     */
+    @Subscribe
+    public void reportExecuteProcessUnit(final ExecuteProcessUnitReportEvent 
event) {
+        // TODO lock on the same jvm
+        String executionPath = node.getExecutionPath(event.getExecutionID());
+        YamlExecuteProcessContext yamlExecuteProcessContext = 
YamlEngine.unmarshal(repository.get(executionPath), 
YamlExecuteProcessContext.class);
+        ExecuteProcessUnit executeProcessUnit = event.getExecuteProcessUnit();
+        for (YamlExecuteProcessUnit unit : 
yamlExecuteProcessContext.getUnitStatuses()) {
+            if (unit.getUnitID().equals(executeProcessUnit.getUnitID())) {
+                unit.setStatus(executeProcessUnit.getStatus());
+            }
+        }
+        repository.persist(executionPath, 
YamlEngine.marshal(yamlExecuteProcessContext));
+    }
+    
+    /**
+     * Report execute process.
+     *
+     * @param event execute process report event.
+     */
+    @Subscribe
+    public void reportExecuteProcess(final ExecuteProcessReportEvent event) {
+        String executionPath = node.getExecutionPath(event.getExecutionID());
+        YamlExecuteProcessContext yamlExecuteProcessContext = 
YamlEngine.unmarshal(repository.get(executionPath), 
YamlExecuteProcessContext.class);
+        for (YamlExecuteProcessUnit unit : 
yamlExecuteProcessContext.getUnitStatuses()) {
+            if (unit.getStatus() != 
ExecuteProcessConstants.EXECUTE_STATUS_DONE) {
+                return;
+            }
+        }
+        repository.delete(executionPath);
+    }
+}
diff --git 
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/service/scaling/ScalingRegistryService.java
 
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/service/scaling/ScalingRegistrySubscriber.java
similarity index 94%
rename from 
shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/service/scaling/ScalingRegistryService.java
rename to 
shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/service/scaling/ScalingRegistrySubscriber.java
index e2f31bb..157cb0a 100644
--- 
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/service/scaling/ScalingRegistryService.java
+++ 
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/service/scaling/ScalingRegistrySubscriber.java
@@ -33,10 +33,10 @@ import 
org.apache.shardingsphere.infra.yaml.swapper.YamlRuleConfigurationSwapper
 import java.util.Collection;
 
 /**
- * Scaling registry service.
+ * Scaling registry subscriber.
  */
 // TODO move to scaling module
-public final class ScalingRegistryService {
+public final class ScalingRegistrySubscriber {
     
     private final RegistryCenterRepository repository;
     
@@ -46,7 +46,7 @@ public final class ScalingRegistryService {
     
     private final RegistryCacheManager registryCacheManager;
     
-    public ScalingRegistryService(final RegistryCenterRepository repository, 
final SchemaRuleRegistryService schemaRuleService) {
+    public ScalingRegistrySubscriber(final RegistryCenterRepository 
repository, final SchemaRuleRegistryService schemaRuleService) {
         this.repository = repository;
         node = new RegistryCenterNode();
         this.schemaRuleService = schemaRuleService;
diff --git 
a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/service/scaling/ScalingRegistryServiceTest.java
 
b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/service/scaling/ScalingRegistrySubscriberTest.java
similarity index 86%
rename from 
shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/service/scaling/ScalingRegistryServiceTest.java
rename to 
shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/service/scaling/ScalingRegistrySubscriberTest.java
index 21c00fc..d5132a6 100644
--- 
a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/service/scaling/ScalingRegistryServiceTest.java
+++ 
b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/service/scaling/ScalingRegistrySubscriberTest.java
@@ -40,7 +40,7 @@ import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
-public final class ScalingRegistryServiceTest {
+public final class ScalingRegistrySubscriberTest {
     
     @Mock
     private RegistryCenterRepository registryCenterRepository;
@@ -51,24 +51,25 @@ public final class ScalingRegistryServiceTest {
     @Mock
     private RegistryCacheManager registryCacheManager;
     
-    private ScalingRegistryService scalingRegistryService;
+    private ScalingRegistrySubscriber scalingRegistrySubscriber;
     
     @Before
     public void setUp() throws ReflectiveOperationException {
-        scalingRegistryService = new 
ScalingRegistryService(registryCenterRepository, schemaRuleService);
+        scalingRegistrySubscriber = new 
ScalingRegistrySubscriber(registryCenterRepository, schemaRuleService);
     }
     
     @Test
     public void assertSwitchRuleConfiguration() throws 
ReflectiveOperationException {
-        Field field = 
ScalingRegistryService.class.getDeclaredField("registryCacheManager");
+        Field field = 
ScalingRegistrySubscriber.class.getDeclaredField("registryCacheManager");
         field.setAccessible(true);
-        field.set(scalingRegistryService, registryCacheManager);
+        field.set(scalingRegistrySubscriber, registryCacheManager);
         when(registryCacheManager.loadCache(anyString(), 
eq("testCacheId"))).thenReturn(readYAML());
         SwitchRuleConfigurationEvent event = new 
SwitchRuleConfigurationEvent("sharding_db", "testCacheId");
-        scalingRegistryService.switchRuleConfiguration(event);
+        scalingRegistrySubscriber.switchRuleConfiguration(event);
         // TODO finish verify
     }
     
+    @Test
     public void assertCacheRuleConfiguration() {
         // TODO finish test case
     }

Reply via email to