This is an automated email from the ASF dual-hosted git repository.

zhangyonglun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new c647373  Optimize scaling job list (#9514)
c647373 is described below

commit c647373caa4cd682973bb2cb513e55e9053f1f74
Author: 邱鹿 Lucas <[email protected]>
AuthorDate: Fri Feb 26 15:50:14 2021 +0800

    Optimize scaling job list (#9514)
    
    Co-authored-by: qiulu3 <Lucas209910>
---
 .../ral/impl/ShowScalingJobListBackendHandler.java |  3 +-
 .../shardingsphere/scaling/core/api/JobInfo.java   |  8 +---
 .../scaling/core/api/impl/ScalingAPIImpl.java      | 45 +---------------------
 .../scaling/core/config/HandleConfiguration.java   |  2 +
 .../scaling/core/util/JobConfigurationUtil.java    | 29 +++++++++-----
 .../scaling/core/api/impl/ScalingAPIImplTest.java  |  6 +--
 6 files changed, 25 insertions(+), 68 deletions(-)

diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/impl/ShowScalingJobListBackendHandler.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/impl/ShowScalingJobListBackendHandler.java
index 5d097c8..5c48b65 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/impl/ShowScalingJobListBackendHandler.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/impl/ShowScalingJobListBackendHandler.java
@@ -27,7 +27,6 @@ import org.apache.shardingsphere.scaling.core.api.ScalingAPI;
 import org.apache.shardingsphere.scaling.core.api.ScalingAPIFactory;
 
 import java.sql.Types;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
@@ -69,7 +68,7 @@ public final class ShowScalingJobListBackendHandler 
implements TextProtocolBacke
                 .map(each -> {
                     Map<String, Object> map = Maps.newHashMap();
                     map.put("id", each.getJobId());
-                    map.put("tables", 
Arrays.stream(each.getTables()).reduce((a, b) -> String.format("%s, %s", a, 
b)).orElse(""));
+                    map.put("tables", each.getTables());
                     map.put("sharding_total_count", 
each.getShardingTotalCount());
                     map.put("active", each.isActive() ? 1 : 0);
                     return map;
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/JobInfo.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/JobInfo.java
index 40bfe84..00cba9b 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/JobInfo.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/JobInfo.java
@@ -33,13 +33,7 @@ public final class JobInfo {
     
     private boolean active;
     
-    private String status;
-    
-    private String[] tables;
+    private String tables;
     
     private int shardingTotalCount;
-    
-    private int inventoryFinishedPercentage;
-    
-    private long incrementalAverageDelayMilliseconds = -1;
 }
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
index 42f6594..09d0666 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
@@ -27,13 +27,11 @@ import 
org.apache.shardingsphere.scaling.core.common.constant.ScalingConstant;
 import 
org.apache.shardingsphere.scaling.core.common.exception.ScalingJobNotFoundException;
 import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
 import org.apache.shardingsphere.scaling.core.job.JobContext;
-import org.apache.shardingsphere.scaling.core.job.JobStatus;
 import org.apache.shardingsphere.scaling.core.job.ScalingJob;
 import 
org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
 import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
 import 
org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckerFactory;
 import 
org.apache.shardingsphere.scaling.core.job.environment.ScalingEnvironmentManager;
-import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
 import org.apache.shardingsphere.scaling.core.job.progress.JobProgress;
 import org.apache.shardingsphere.scaling.core.util.JobConfigurationUtil;
 
@@ -41,9 +39,7 @@ import java.sql.SQLException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
-import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -62,51 +58,12 @@ public final class ScalingAPIImpl implements ScalingAPI {
         JobInfo result = new JobInfo(Long.parseLong(jobName));
         JobConfigurationPOJO jobConfigPOJO = 
getElasticJobConfigPOJO(result.getJobId());
         JobConfiguration jobConfig = getJobConfig(jobConfigPOJO);
-        Map<Integer, JobProgress> jobProgressMap = 
getProgress(result.getJobId());
         result.setActive(!jobConfigPOJO.isDisabled());
         
result.setShardingTotalCount(jobConfig.getHandleConfig().getShardingTotalCount());
-        result.setTables(jobConfig.getHandleConfig().getShardingTables());
-        result.setStatus(getStatus(jobProgressMap));
-        
result.setInventoryFinishedPercentage(getInventoryFinishedPercentage(jobProgressMap));
-        
result.setIncrementalAverageDelayMilliseconds(getIncrementalAverageDelayMilliseconds(jobProgressMap));
+        result.setTables(jobConfig.getHandleConfig().getLogicTables());
         return result;
     }
     
-    private String getStatus(final Map<Integer, JobProgress> jobProgressMap) {
-        String result = null;
-        Set<JobProgress> jobProgressSet = jobProgressMap.values().stream()
-                .filter(Objects::nonNull)
-                .collect(Collectors.toSet());
-        for (JobProgress each : jobProgressSet) {
-            if (null == result || !each.getStatus().isRunning()) {
-                result = each.getStatus().name();
-            }
-        }
-        return null == result ? JobStatus.RUNNING.name() : result;
-    }
-    
-    private int getInventoryFinishedPercentage(final Map<Integer, JobProgress> 
jobProgressMap) {
-        long isNull = jobProgressMap.values().stream()
-                .filter(Objects::isNull).count();
-        long total = jobProgressMap.values().stream()
-                .filter(Objects::nonNull).count();
-        long finished = jobProgressMap.values().stream()
-                .filter(Objects::nonNull)
-                .flatMap(each -> 
each.getIncrementalTaskProgressMap().values().stream())
-                .filter(each -> each.getPosition() instanceof FinishedPosition)
-                .count();
-        return total == 0 ? 0 : (int) ((finished * 100 / total) * 
(jobProgressMap.size() - isNull) / jobProgressMap.size());
-    }
-    
-    private long getIncrementalAverageDelayMilliseconds(final Map<Integer, 
JobProgress> jobProgressMap) {
-        List<Long> delays = jobProgressMap.values().stream()
-                .filter(Objects::nonNull)
-                .flatMap(each -> 
each.getIncrementalTaskProgressMap().values().stream())
-                .map(each -> 
each.getIncrementalTaskDelay().getDelayMilliseconds())
-                .collect(Collectors.toList());
-        return delays.isEmpty() || delays.contains(-1L) ? -1 : 
delays.stream().reduce(Long::sum).orElse(0L) / delays.size();
-    }
-    
     @Override
     public void start(final long jobId) {
         log.info("Start scaling job {}", jobId);
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/HandleConfiguration.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/HandleConfiguration.java
index 319b66e..a8776ad 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/HandleConfiguration.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/HandleConfiguration.java
@@ -37,6 +37,8 @@ public final class HandleConfiguration {
     
     private String[] shardingTables;
     
+    private String logicTables;
+    
     private int shardingItem;
     
     private int shardingSize = 1000 * 10000;
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/JobConfigurationUtil.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/JobConfigurationUtil.java
index e034c16..587cbb9 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/JobConfigurationUtil.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/JobConfigurationUtil.java
@@ -95,27 +95,30 @@ public final class JobConfigurationUtil {
             
handleConfig.setDatabaseType(jobConfig.getRuleConfig().getSource().unwrap().getDatabaseType().getName());
         }
         if (null == jobConfig.getHandleConfig().getShardingTables()) {
-            
handleConfig.setShardingTables(groupByDataSource(getShouldScalingActualDataNodes(jobConfig)));
+            Map<String, String> shouldScalingActualDataNodes = 
getShouldScalingActualDataNodes(jobConfig);
+            
handleConfig.setShardingTables(groupByDataSource(shouldScalingActualDataNodes.values()));
+            
handleConfig.setLogicTables(getLogicTables(shouldScalingActualDataNodes.keySet()));
         }
     }
     
-    private static List<String> getShouldScalingActualDataNodes(final 
JobConfiguration jobConfig) {
+    private static Map<String, String> getShouldScalingActualDataNodes(final 
JobConfiguration jobConfig) {
         ScalingDataSourceConfiguration sourceConfig = 
jobConfig.getRuleConfig().getSource().unwrap();
         Preconditions.checkState(sourceConfig instanceof 
ShardingSphereJDBCDataSourceConfiguration,
                 "Only ShardingSphereJdbc type of source 
ScalingDataSourceConfiguration is supported.");
         ShardingSphereJDBCDataSourceConfiguration source = 
(ShardingSphereJDBCDataSourceConfiguration) sourceConfig;
         if (!(jobConfig.getRuleConfig().getTarget().unwrap() instanceof 
ShardingSphereJDBCDataSourceConfiguration)) {
-            return 
getShardingRuleConfigMap(source.getDataSourceRuleConfig()).values().stream().map(ShardingTableRuleConfiguration::getActualDataNodes).collect(Collectors.toList());
+            return 
getShardingRuleConfigMap(source.getDataSourceRuleConfig()).entrySet().stream()
+                    .collect(Collectors.toMap(Map.Entry::getKey, each -> 
each.getValue().getActualDataNodes()));
         }
         ShardingSphereJDBCDataSourceConfiguration target = 
(ShardingSphereJDBCDataSourceConfiguration) 
jobConfig.getRuleConfig().getTarget().unwrap();
         return 
getShouldScalingActualDataNodes(getModifiedDataSources(source.getDataSourceRuleConfig(),
 target.getDataSourceRuleConfig()),
                 getShardingRuleConfigMap(source.getDataSourceRuleConfig()), 
getShardingRuleConfigMap(target.getDataSourceRuleConfig()));
     }
     
-    private static List<String> getShouldScalingActualDataNodes(final 
Set<String> modifiedDataSources,
-                                                                final 
Map<String, ShardingTableRuleConfiguration> oldShardingRuleConfigMap,
-                                                                final 
Map<String, ShardingTableRuleConfiguration> newShardingRuleConfigMap) {
-        List<String> result = new ArrayList<>();
+    private static Map<String, String> getShouldScalingActualDataNodes(final 
Set<String> modifiedDataSources,
+                                                                       final 
Map<String, ShardingTableRuleConfiguration> oldShardingRuleConfigMap,
+                                                                       final 
Map<String, ShardingTableRuleConfiguration> newShardingRuleConfigMap) {
+        Map<String, String> result = Maps.newHashMap();
         newShardingRuleConfigMap.keySet().forEach(each -> {
             if (!oldShardingRuleConfigMap.containsKey(each)) {
                 return;
@@ -123,7 +126,7 @@ public final class JobConfigurationUtil {
             List<String> oldActualDataNodes = new 
InlineExpressionParser(oldShardingRuleConfigMap.get(each).getActualDataNodes()).splitAndEvaluate();
             List<String> newActualDataNodes = new 
InlineExpressionParser(newShardingRuleConfigMap.get(each).getActualDataNodes()).splitAndEvaluate();
             if (!CollectionUtils.isEqualCollection(oldActualDataNodes, 
newActualDataNodes) || includeModifiedDataSources(newActualDataNodes, 
modifiedDataSources)) {
-                
result.add(oldShardingRuleConfigMap.get(each).getActualDataNodes());
+                result.put(each, 
oldShardingRuleConfigMap.get(each).getActualDataNodes());
             }
         });
         return result;
@@ -158,7 +161,7 @@ public final class JobConfigurationUtil {
         return 
ruleConfig.getTables().stream().collect(Collectors.toMap(ShardingTableRuleConfiguration::getLogicTable,
 Function.identity()));
     }
     
-    private static String[] groupByDataSource(final List<String> 
actualDataNodeList) {
+    private static String[] groupByDataSource(final Collection<String> 
actualDataNodeList) {
         List<String> result = new ArrayList<>();
         Multimap<String, String> multiMap = 
getNodeMultiMap(actualDataNodeList);
         for (String key : multiMap.keySet()) {
@@ -171,7 +174,7 @@ public final class JobConfigurationUtil {
         return result.toArray(new String[0]);
     }
     
-    private static Multimap<String, String> getNodeMultiMap(final List<String> 
actualDataNodeList) {
+    private static Multimap<String, String> getNodeMultiMap(final 
Collection<String> actualDataNodeList) {
         Multimap<String, String> result = HashMultimap.create();
         for (String actualDataNodes : actualDataNodeList) {
             for (String actualDataNode : actualDataNodes.split(",")) {
@@ -200,6 +203,12 @@ public final class JobConfigurationUtil {
         return new String[]{actualDataNode.substring(0, i), 
actualDataNode.substring(i + 1)};
     }
     
+    private static String getLogicTables(final Set<String> logicTables) {
+        return logicTables.stream()
+                .reduce((a, b) -> String.format("%s, %s", a, b))
+                .orElse("");
+    }
+    
     /**
      * Split job configuration to task configurations.
      *
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java
index 142c8fc..bf742e1 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java
@@ -28,7 +28,6 @@ import 
org.apache.shardingsphere.scaling.core.config.RuleConfiguration;
 import org.apache.shardingsphere.scaling.core.config.ScalingContext;
 import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
 import org.apache.shardingsphere.scaling.core.fixture.EmbedTestingServer;
-import org.apache.shardingsphere.scaling.core.job.JobStatus;
 import 
org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
 import org.apache.shardingsphere.scaling.core.job.progress.JobProgress;
 import org.apache.shardingsphere.scaling.core.util.ReflectionUtil;
@@ -68,11 +67,8 @@ public final class ScalingAPIImplTest {
         assertTrue(jobId.isPresent());
         JobInfo jobInfo = getNonNullJobInfo(jobId.get());
         assertTrue(jobInfo.isActive());
-        assertThat(jobInfo.getStatus(), is(JobStatus.RUNNING.name()));
-        assertThat(jobInfo.getTables(), is(new String[]{"ds_0.t_order"}));
+        assertThat(jobInfo.getTables(), is("t_order"));
         assertThat(jobInfo.getShardingTotalCount(), is(1));
-        assertThat(jobInfo.getInventoryFinishedPercentage(), is(0));
-        assertThat(jobInfo.getIncrementalAverageDelayMilliseconds(), is(-1L));
     }
     
     private Optional<JobInfo> getJobInfo(final long jobId) {

Reply via email to