This is an automated email from the ASF dual-hosted git repository.

menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new acf8a33  Merge scaling listener path and elastic job path (#9253)
acf8a33 is described below

commit acf8a3326bd6e19f96c5f1dc06c27ee0105bc4b6
Author: 邱鹿 Lucas <[email protected]>
AuthorDate: Mon Feb 1 17:39:11 2021 +0800

    Merge scaling listener path and elastic job path (#9253)
    
    * Merge scaling listener path and elastic job path
    
    * Optimize RegistryRepositoryAPIImpl
    
    * Optimize RegistryRepositoryAPIImpl
    
    Co-authored-by: qiulu3 <Lucas209910>
---
 .../scaling/core/api/ScalingAPIFactory.java        |  4 +-
 .../core/api/impl/RegistryRepositoryAPIImpl.java   | 11 +++--
 .../scaling/core/constant/ScalingConstant.java     | 47 +---------------------
 .../execute/executor/job/ScalingJobExecutor.java   |  7 ++--
 .../scaling/core/job/FinishedCheckJob.java         |  2 +-
 .../core/job/preparer/ScalingJobPreparer.java      |  4 +-
 .../scaling/core/utils/ElasticJobUtil.java         |  2 +-
 .../scaling/core/utils/ScalingTaskUtil.java        | 15 -------
 8 files changed, 17 insertions(+), 75 deletions(-)

diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPIFactory.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPIFactory.java
index d71725f..a2cfdc1 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPIFactory.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPIFactory.java
@@ -96,7 +96,7 @@ public final class ScalingAPIFactory {
             GovernanceConfiguration governanceConfig = 
ScalingContext.getInstance().getServerConfig().getGovernanceConfig();
             GovernanceCenterConfiguration registryCenterConfig = 
governanceConfig.getRegistryCenterConfiguration();
             RegistryRepository registryRepository = 
TypedSPIRegistry.getRegisteredService(RegistryRepository.class, 
registryCenterConfig.getType(), registryCenterConfig.getProps());
-            registryRepository.init(governanceConfig.getName(), 
registryCenterConfig);
+            registryRepository.init(governanceConfig.getName() + 
ScalingConstant.SCALING_ROOT, registryCenterConfig);
             INSTANCE = new RegistryRepositoryAPIImpl(registryRepository);
         }
     }
@@ -112,7 +112,7 @@ public final class ScalingAPIFactory {
         
         private ElasticJobAPIHolder() {
             GovernanceConfiguration governanceConfig = 
ScalingContext.getInstance().getServerConfig().getGovernanceConfig();
-            String namespace = governanceConfig.getName() + 
ScalingConstant.SCALING_ROOT_PATH;
+            String namespace = governanceConfig.getName() + 
ScalingConstant.SCALING_ROOT;
             jobStatisticsAPI = 
JobAPIFactory.createJobStatisticsAPI(governanceConfig.getRegistryCenterConfiguration().getServerLists(),
 namespace, null);
             jobConfigurationAPI = 
JobAPIFactory.createJobConfigurationAPI(governanceConfig.getRegistryCenterConfiguration().getServerLists(),
 namespace, null);
         }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImpl.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImpl.java
index cf7a17a..4cd9002 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImpl.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImpl.java
@@ -29,7 +29,6 @@ import 
org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTa
 import 
org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTaskProgress;
 import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTask;
 import 
org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTaskProgress;
-import org.apache.shardingsphere.scaling.core.utils.ScalingTaskUtil;
 
 import java.util.HashMap;
 import java.util.List;
@@ -51,7 +50,7 @@ public final class RegistryRepositoryAPIImpl implements 
RegistryRepositoryAPI {
         
jobPosition.setDatabaseType(jobContext.getJobConfig().getHandleConfig().getDatabaseType());
         
jobPosition.setIncrementalTaskProgressMap(getIncrementalTaskProgressMap(jobContext));
         
jobPosition.setInventoryTaskProgressMap(getInventoryTaskProgressMap(jobContext));
-        
registryRepository.persist(ScalingTaskUtil.getScalingListenerPath(jobContext.getJobId(),
 jobContext.getShardingItem()), jobPosition.toJson());
+        registryRepository.persist(getPath(jobContext.getJobId(), 
jobContext.getShardingItem()), jobPosition.toJson());
     }
     
     private Map<String, IncrementalTaskProgress> 
getIncrementalTaskProgressMap(final JobContext jobContext) {
@@ -70,11 +69,15 @@ public final class RegistryRepositoryAPIImpl implements 
RegistryRepositoryAPI {
         return result;
     }
     
+    private String getPath(final long jobId, final int shardingItem) {
+        return String.format("/%d/offset/%d", jobId, shardingItem);
+    }
+    
     @Override
     public JobProgress getJobProgress(final long jobId, final int 
shardingItem) {
         String data = null;
         try {
-            data = 
registryRepository.get(ScalingTaskUtil.getScalingListenerPath(jobId, 
shardingItem));
+            data = registryRepository.get(getPath(jobId, shardingItem));
         } catch (final NullPointerException ex) {
             log.info("job {}-{} without break point.", jobId, shardingItem);
         }
@@ -84,7 +87,7 @@ public final class RegistryRepositoryAPIImpl implements 
RegistryRepositoryAPI {
     @Override
     public void deleteJob(final long jobId) {
         log.info("delete job {}", jobId);
-        
registryRepository.delete(ScalingTaskUtil.getScalingListenerPath(jobId));
+        registryRepository.delete(String.valueOf(jobId));
     }
     
     @Override
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/constant/ScalingConstant.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/constant/ScalingConstant.java
index 3c0ef9d..c85360e 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/constant/ScalingConstant.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/constant/ScalingConstant.java
@@ -44,50 +44,5 @@ public final class ScalingConstant {
     /**
      * Scaling root path.
      */
-    public static final String SCALING_ROOT_PATH = "/scaling";
-    
-    /**
-     * Scaling listener path.
-     */
-    public static final String SCALING_LISTENER_PATH = SCALING_ROOT_PATH + 
"/listener";
-    
-    /**
-     * Scaling elastic job path.
-     */
-    public static final String SCALING_ELASTIC_JOB_PATH = SCALING_ROOT_PATH + 
"/elastic_job";
-    
-    /**
-     * Config.
-     */
-    public static final String CONFIG = "config";
-    
-    /**
-     * Status.
-     */
-    public static final String STATUS = "status";
-    
-    /**
-     * Position.
-     */
-    public static final String POSITION = "position";
-    
-    /**
-     * Workflow.
-     */
-    public static final String WORKFLOW = "workflow";
-    
-    /**
-     * Incremental.
-     */
-    public static final String INCREMENTAL = "incremental";
-    
-    /**
-     * Inventory.
-     */
-    public static final String INVENTORY = "inventory";
-    
-    /**
-     * Delay.
-     */
-    public static final String DELAY = "delay";
+    public static final String SCALING_ROOT = "/scaling";
 }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/ScalingJobExecutor.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/ScalingJobExecutor.java
index 94d24d9..73fd0f0 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/ScalingJobExecutor.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/ScalingJobExecutor.java
@@ -40,7 +40,6 @@ import 
org.apache.shardingsphere.scaling.core.execute.executor.AbstractScalingEx
 import org.apache.shardingsphere.scaling.core.execute.executor.ScalingExecutor;
 import org.apache.shardingsphere.scaling.core.job.ScalingJob;
 import org.apache.shardingsphere.scaling.core.utils.ElasticJobUtil;
-import org.apache.shardingsphere.scaling.core.utils.ScalingTaskUtil;
 
 import java.util.Map;
 import java.util.Optional;
@@ -55,7 +54,7 @@ public final class ScalingJobExecutor extends 
AbstractScalingExecutor implements
     
     private static final Gson GSON = new 
GsonBuilder().disableHtmlEscaping().serializeNulls().create();
     
-    private static final Pattern CONFIG_PATTERN = 
Pattern.compile(ScalingTaskUtil.getScalingListenerPath("(\\d+)", 
ScalingConstant.CONFIG));
+    private static final Pattern CONFIG_PATTERN = 
Pattern.compile("(\\d+)/config");
     
     private final RegistryRepositoryAPI registryRepositoryAPI = 
ScalingAPIFactory.getRegistryRepositoryAPI();
     
@@ -75,7 +74,7 @@ public final class ScalingJobExecutor extends 
AbstractScalingExecutor implements
     }
     
     private void watchConfigRepository() {
-        registryRepositoryAPI.watch(ScalingConstant.SCALING_LISTENER_PATH, 
event -> {
+        registryRepositoryAPI.watch(ScalingConstant.SCALING_ROOT, event -> {
             Optional<JobConfiguration> jobConfig = getJobConfig(event);
             if (!jobConfig.isPresent()) {
                 return;
@@ -144,7 +143,7 @@ public final class ScalingJobExecutor extends 
AbstractScalingExecutor implements
         if (new LeaderService(registryCenter, jobId).isLeader()) {
             log.info("leader worker update config.");
             
JobAPIFactory.createJobConfigurationAPI(governanceConfig.getRegistryCenterConfiguration().getServerLists(),
-                    governanceConfig.getName() + 
ScalingConstant.SCALING_ELASTIC_JOB_PATH, null)
+                    governanceConfig.getName() + ScalingConstant.SCALING_ROOT, 
null)
                     
.updateJobConfiguration(JobConfigurationPOJO.fromJobConfiguration(createJobConfig(jobId,
 jobConfig)));
         }
         
jobBootstrapWrapper.setRunning(jobConfig.getHandleConfig().isRunning());
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
index a39f87c..59302e9 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
@@ -44,7 +44,7 @@ public final class FinishedCheckJob implements SimpleJob {
     
     @Override
     public void execute(final ShardingContext shardingContext) {
-        List<String> jobs = 
registryRepositoryAPI.getChildrenKeys(ScalingConstant.SCALING_LISTENER_PATH);
+        List<String> jobs = 
registryRepositoryAPI.getChildrenKeys(ScalingConstant.SCALING_ROOT);
         for (String each : jobs) {
             long jobId = Long.parseLong(each);
             try {
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
index 0e06c4c..6b1482c 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
@@ -56,9 +56,9 @@ public final class ScalingJobPreparer {
             checkDataSource(jobContext, dataSourceManager);
             initIncrementalTasks(jobContext, dataSourceManager);
             initInventoryTasks(jobContext, dataSourceManager);
-        } catch (final PrepareFailedException | SQLException ex) {
-            log.error("Preparing scaling job {} failed", 
jobContext.getJobId(), ex);
+        } catch (final SQLException ex) {
             jobContext.setStatus(JobStatus.PREPARING_FAILURE);
+            throw new PrepareFailedException("Scaling job preparing failed", 
ex);
         }
     }
     
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ElasticJobUtil.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ElasticJobUtil.java
index 4c1d621e..bc35fc2 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ElasticJobUtil.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ElasticJobUtil.java
@@ -49,7 +49,7 @@ public final class ElasticJobUtil {
     private static ZookeeperConfiguration getZookeeperConfig() {
         GovernanceConfiguration governanceConfig = 
ScalingContext.getInstance().getServerConfig().getGovernanceConfig();
         ZookeeperConfiguration result = new 
ZookeeperConfiguration(governanceConfig.getRegistryCenterConfiguration().getServerLists(),
-                governanceConfig.getName() + 
ScalingConstant.SCALING_ELASTIC_JOB_PATH);
+                governanceConfig.getName() + ScalingConstant.SCALING_ROOT);
         Properties props = 
governanceConfig.getRegistryCenterConfiguration().getProps();
         result.setMaxSleepTimeMilliseconds(getProperty(props, 
"max.sleep.time.milliseconds", result.getMaxSleepTimeMilliseconds()));
         result.setBaseSleepTimeMilliseconds(getProperty(props, 
"base.sleep.time.milliseconds", result.getBaseSleepTimeMilliseconds()));
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
index 826eb77..19760f5 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.scaling.core.utils;
 
 import org.apache.shardingsphere.scaling.core.config.HandleConfiguration;
-import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
 import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
 import org.apache.shardingsphere.scaling.core.job.position.JobProgress;
 import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTask;
@@ -71,18 +70,4 @@ public final class ScalingTaskUtil {
                 .flatMap(each -> 
each.getIncrementalTaskProgressMap().values().stream())
                 .allMatch(each -> 
each.getIncrementalTaskDelay().getDelayMilliseconds() <= 
handleConfig.getWorkflowConfig().getAllowDelayMilliseconds());
     }
-    
-    /**
-     * Get scaling listener path.
-     *
-     * @param paths sub paths.
-     * @return path.
-     */
-    public static String getScalingListenerPath(final Object... paths) {
-        StringBuilder result = new 
StringBuilder(ScalingConstant.SCALING_LISTENER_PATH);
-        for (Object each : paths) {
-            result.append("/").append(each);
-        }
-        return result.toString();
-    }
 }

Reply via email to