This is an automated email from the ASF dual-hosted git repository.
zhangyonglun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c647373 Optimize scaling job list (#9514)
c647373 is described below
commit c647373caa4cd682973bb2cb513e55e9053f1f74
Author: 邱鹿 Lucas <[email protected]>
AuthorDate: Fri Feb 26 15:50:14 2021 +0800
Optimize scaling job list (#9514)
Co-authored-by: qiulu3 <Lucas209910>
---
.../ral/impl/ShowScalingJobListBackendHandler.java | 3 +-
.../shardingsphere/scaling/core/api/JobInfo.java | 8 +---
.../scaling/core/api/impl/ScalingAPIImpl.java | 45 +---------------------
.../scaling/core/config/HandleConfiguration.java | 2 +
.../scaling/core/util/JobConfigurationUtil.java | 29 +++++++++-----
.../scaling/core/api/impl/ScalingAPIImplTest.java | 6 +--
6 files changed, 25 insertions(+), 68 deletions(-)
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/impl/ShowScalingJobListBackendHandler.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/impl/ShowScalingJobListBackendHandler.java
index 5d097c8..5c48b65 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/impl/ShowScalingJobListBackendHandler.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/impl/ShowScalingJobListBackendHandler.java
@@ -27,7 +27,6 @@ import org.apache.shardingsphere.scaling.core.api.ScalingAPI;
import org.apache.shardingsphere.scaling.core.api.ScalingAPIFactory;
import java.sql.Types;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@@ -69,7 +68,7 @@ public final class ShowScalingJobListBackendHandler
implements TextProtocolBacke
.map(each -> {
Map<String, Object> map = Maps.newHashMap();
map.put("id", each.getJobId());
- map.put("tables",
Arrays.stream(each.getTables()).reduce((a, b) -> String.format("%s, %s", a,
b)).orElse(""));
+ map.put("tables", each.getTables());
map.put("sharding_total_count",
each.getShardingTotalCount());
map.put("active", each.isActive() ? 1 : 0);
return map;
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/JobInfo.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/JobInfo.java
index 40bfe84..00cba9b 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/JobInfo.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/JobInfo.java
@@ -33,13 +33,7 @@ public final class JobInfo {
private boolean active;
- private String status;
-
- private String[] tables;
+ private String tables;
private int shardingTotalCount;
-
- private int inventoryFinishedPercentage;
-
- private long incrementalAverageDelayMilliseconds = -1;
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
index 42f6594..09d0666 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
@@ -27,13 +27,11 @@ import
org.apache.shardingsphere.scaling.core.common.constant.ScalingConstant;
import
org.apache.shardingsphere.scaling.core.common.exception.ScalingJobNotFoundException;
import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
import org.apache.shardingsphere.scaling.core.job.JobContext;
-import org.apache.shardingsphere.scaling.core.job.JobStatus;
import org.apache.shardingsphere.scaling.core.job.ScalingJob;
import
org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
import
org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckerFactory;
import
org.apache.shardingsphere.scaling.core.job.environment.ScalingEnvironmentManager;
-import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
import org.apache.shardingsphere.scaling.core.job.progress.JobProgress;
import org.apache.shardingsphere.scaling.core.util.JobConfigurationUtil;
@@ -41,9 +39,7 @@ import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
-import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -62,51 +58,12 @@ public final class ScalingAPIImpl implements ScalingAPI {
JobInfo result = new JobInfo(Long.parseLong(jobName));
JobConfigurationPOJO jobConfigPOJO =
getElasticJobConfigPOJO(result.getJobId());
JobConfiguration jobConfig = getJobConfig(jobConfigPOJO);
- Map<Integer, JobProgress> jobProgressMap =
getProgress(result.getJobId());
result.setActive(!jobConfigPOJO.isDisabled());
result.setShardingTotalCount(jobConfig.getHandleConfig().getShardingTotalCount());
- result.setTables(jobConfig.getHandleConfig().getShardingTables());
- result.setStatus(getStatus(jobProgressMap));
-
result.setInventoryFinishedPercentage(getInventoryFinishedPercentage(jobProgressMap));
-
result.setIncrementalAverageDelayMilliseconds(getIncrementalAverageDelayMilliseconds(jobProgressMap));
+ result.setTables(jobConfig.getHandleConfig().getLogicTables());
return result;
}
- private String getStatus(final Map<Integer, JobProgress> jobProgressMap) {
- String result = null;
- Set<JobProgress> jobProgressSet = jobProgressMap.values().stream()
- .filter(Objects::nonNull)
- .collect(Collectors.toSet());
- for (JobProgress each : jobProgressSet) {
- if (null == result || !each.getStatus().isRunning()) {
- result = each.getStatus().name();
- }
- }
- return null == result ? JobStatus.RUNNING.name() : result;
- }
-
- private int getInventoryFinishedPercentage(final Map<Integer, JobProgress>
jobProgressMap) {
- long isNull = jobProgressMap.values().stream()
- .filter(Objects::isNull).count();
- long total = jobProgressMap.values().stream()
- .filter(Objects::nonNull).count();
- long finished = jobProgressMap.values().stream()
- .filter(Objects::nonNull)
- .flatMap(each ->
each.getIncrementalTaskProgressMap().values().stream())
- .filter(each -> each.getPosition() instanceof FinishedPosition)
- .count();
- return total == 0 ? 0 : (int) ((finished * 100 / total) *
(jobProgressMap.size() - isNull) / jobProgressMap.size());
- }
-
- private long getIncrementalAverageDelayMilliseconds(final Map<Integer,
JobProgress> jobProgressMap) {
- List<Long> delays = jobProgressMap.values().stream()
- .filter(Objects::nonNull)
- .flatMap(each ->
each.getIncrementalTaskProgressMap().values().stream())
- .map(each ->
each.getIncrementalTaskDelay().getDelayMilliseconds())
- .collect(Collectors.toList());
- return delays.isEmpty() || delays.contains(-1L) ? -1 :
delays.stream().reduce(Long::sum).orElse(0L) / delays.size();
- }
-
@Override
public void start(final long jobId) {
log.info("Start scaling job {}", jobId);
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/HandleConfiguration.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/HandleConfiguration.java
index 319b66e..a8776ad 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/HandleConfiguration.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/HandleConfiguration.java
@@ -37,6 +37,8 @@ public final class HandleConfiguration {
private String[] shardingTables;
+ private String logicTables;
+
private int shardingItem;
private int shardingSize = 1000 * 10000;
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/JobConfigurationUtil.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/JobConfigurationUtil.java
index e034c16..587cbb9 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/JobConfigurationUtil.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/JobConfigurationUtil.java
@@ -95,27 +95,30 @@ public final class JobConfigurationUtil {
handleConfig.setDatabaseType(jobConfig.getRuleConfig().getSource().unwrap().getDatabaseType().getName());
}
if (null == jobConfig.getHandleConfig().getShardingTables()) {
-
handleConfig.setShardingTables(groupByDataSource(getShouldScalingActualDataNodes(jobConfig)));
+ Map<String, String> shouldScalingActualDataNodes =
getShouldScalingActualDataNodes(jobConfig);
+
handleConfig.setShardingTables(groupByDataSource(shouldScalingActualDataNodes.values()));
+
handleConfig.setLogicTables(getLogicTables(shouldScalingActualDataNodes.keySet()));
}
}
- private static List<String> getShouldScalingActualDataNodes(final
JobConfiguration jobConfig) {
+ private static Map<String, String> getShouldScalingActualDataNodes(final
JobConfiguration jobConfig) {
ScalingDataSourceConfiguration sourceConfig =
jobConfig.getRuleConfig().getSource().unwrap();
Preconditions.checkState(sourceConfig instanceof
ShardingSphereJDBCDataSourceConfiguration,
"Only ShardingSphereJdbc type of source
ScalingDataSourceConfiguration is supported.");
ShardingSphereJDBCDataSourceConfiguration source =
(ShardingSphereJDBCDataSourceConfiguration) sourceConfig;
if (!(jobConfig.getRuleConfig().getTarget().unwrap() instanceof
ShardingSphereJDBCDataSourceConfiguration)) {
- return
getShardingRuleConfigMap(source.getDataSourceRuleConfig()).values().stream().map(ShardingTableRuleConfiguration::getActualDataNodes).collect(Collectors.toList());
+ return
getShardingRuleConfigMap(source.getDataSourceRuleConfig()).entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, each ->
each.getValue().getActualDataNodes()));
}
ShardingSphereJDBCDataSourceConfiguration target =
(ShardingSphereJDBCDataSourceConfiguration)
jobConfig.getRuleConfig().getTarget().unwrap();
return
getShouldScalingActualDataNodes(getModifiedDataSources(source.getDataSourceRuleConfig(),
target.getDataSourceRuleConfig()),
getShardingRuleConfigMap(source.getDataSourceRuleConfig()),
getShardingRuleConfigMap(target.getDataSourceRuleConfig()));
}
- private static List<String> getShouldScalingActualDataNodes(final
Set<String> modifiedDataSources,
- final
Map<String, ShardingTableRuleConfiguration> oldShardingRuleConfigMap,
- final
Map<String, ShardingTableRuleConfiguration> newShardingRuleConfigMap) {
- List<String> result = new ArrayList<>();
+ private static Map<String, String> getShouldScalingActualDataNodes(final
Set<String> modifiedDataSources,
+ final
Map<String, ShardingTableRuleConfiguration> oldShardingRuleConfigMap,
+ final
Map<String, ShardingTableRuleConfiguration> newShardingRuleConfigMap) {
+ Map<String, String> result = Maps.newHashMap();
newShardingRuleConfigMap.keySet().forEach(each -> {
if (!oldShardingRuleConfigMap.containsKey(each)) {
return;
@@ -123,7 +126,7 @@ public final class JobConfigurationUtil {
List<String> oldActualDataNodes = new
InlineExpressionParser(oldShardingRuleConfigMap.get(each).getActualDataNodes()).splitAndEvaluate();
List<String> newActualDataNodes = new
InlineExpressionParser(newShardingRuleConfigMap.get(each).getActualDataNodes()).splitAndEvaluate();
if (!CollectionUtils.isEqualCollection(oldActualDataNodes,
newActualDataNodes) || includeModifiedDataSources(newActualDataNodes,
modifiedDataSources)) {
-
result.add(oldShardingRuleConfigMap.get(each).getActualDataNodes());
+ result.put(each,
oldShardingRuleConfigMap.get(each).getActualDataNodes());
}
});
return result;
@@ -158,7 +161,7 @@ public final class JobConfigurationUtil {
return
ruleConfig.getTables().stream().collect(Collectors.toMap(ShardingTableRuleConfiguration::getLogicTable,
Function.identity()));
}
- private static String[] groupByDataSource(final List<String>
actualDataNodeList) {
+ private static String[] groupByDataSource(final Collection<String>
actualDataNodeList) {
List<String> result = new ArrayList<>();
Multimap<String, String> multiMap =
getNodeMultiMap(actualDataNodeList);
for (String key : multiMap.keySet()) {
@@ -171,7 +174,7 @@ public final class JobConfigurationUtil {
return result.toArray(new String[0]);
}
- private static Multimap<String, String> getNodeMultiMap(final List<String>
actualDataNodeList) {
+ private static Multimap<String, String> getNodeMultiMap(final
Collection<String> actualDataNodeList) {
Multimap<String, String> result = HashMultimap.create();
for (String actualDataNodes : actualDataNodeList) {
for (String actualDataNode : actualDataNodes.split(",")) {
@@ -200,6 +203,12 @@ public final class JobConfigurationUtil {
return new String[]{actualDataNode.substring(0, i),
actualDataNode.substring(i + 1)};
}
+ private static String getLogicTables(final Set<String> logicTables) {
+ return logicTables.stream()
+ .reduce((a, b) -> String.format("%s, %s", a, b))
+ .orElse("");
+ }
+
/**
* Split job configuration to task configurations.
*
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java
index 142c8fc..bf742e1 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java
@@ -28,7 +28,6 @@ import
org.apache.shardingsphere.scaling.core.config.RuleConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
import org.apache.shardingsphere.scaling.core.fixture.EmbedTestingServer;
-import org.apache.shardingsphere.scaling.core.job.JobStatus;
import
org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
import org.apache.shardingsphere.scaling.core.job.progress.JobProgress;
import org.apache.shardingsphere.scaling.core.util.ReflectionUtil;
@@ -68,11 +67,8 @@ public final class ScalingAPIImplTest {
assertTrue(jobId.isPresent());
JobInfo jobInfo = getNonNullJobInfo(jobId.get());
assertTrue(jobInfo.isActive());
- assertThat(jobInfo.getStatus(), is(JobStatus.RUNNING.name()));
- assertThat(jobInfo.getTables(), is(new String[]{"ds_0.t_order"}));
+ assertThat(jobInfo.getTables(), is("t_order"));
assertThat(jobInfo.getShardingTotalCount(), is(1));
- assertThat(jobInfo.getInventoryFinishedPercentage(), is(0));
- assertThat(jobInfo.getIncrementalAverageDelayMilliseconds(), is(-1L));
}
private Optional<JobInfo> getJobInfo(final long jobId) {