This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new cfd03ffd662 Improve scaling prepare execute once at high concurrency.
(#20062)
cfd03ffd662 is described below
commit cfd03ffd662d2b6edeccdb28c8df8c2d34363c85
Author: Xinze Guo <[email protected]>
AuthorDate: Wed Aug 10 22:16:53 2022 +0800
Improve scaling prepare execute once at high concurrency. (#20062)
* Improve scaling prepare execute once at high concurrency.
* Use tryLock instead of polling
* Fix ci error
---
.../data/pipeline/api/job/JobStatus.java | 5 ++++
.../rulealtered/RuleAlteredJobPreparer.java | 35 +++++++++++-----------
.../container/compose/DockerComposedContainer.java | 7 +++--
3 files changed, 27 insertions(+), 20 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
index 7741b8385f7..a15396d52fe 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
@@ -37,6 +37,11 @@ public enum JobStatus {
*/
PREPARING(true),
+ /**
+ * Job is in prepare success status.
+ */
+ PREPARE_SUCCESS(true),
+
/**
* Job is in execute inventory task status.
*/
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index b9acabdbcca..4cd69fbcc29 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
import
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
@@ -36,7 +37,6 @@ import
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTa
import
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import
org.apache.shardingsphere.data.pipeline.core.util.PipelineJobPreparerUtils;
-import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.prepare.InventoryTaskSplitter;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
@@ -47,7 +47,6 @@ import
org.apache.shardingsphere.mode.lock.ExclusiveLockDefinition;
import java.sql.SQLException;
import java.util.Collections;
-import java.util.concurrent.TimeUnit;
/**
* Rule altered job preparer.
@@ -90,27 +89,27 @@ public final class RuleAlteredJobPreparer {
String lockName = "prepare-" + jobConfig.getJobId();
LockContext lockContext =
PipelineContext.getContextManager().getInstanceContext().getLockContext();
LockDefinition lockDefinition = new ExclusiveLockDefinition(lockName);
- if (lockContext.tryLock(lockDefinition, 3000)) {
+ RuleAlteredJobAPIFactory.getInstance().persistJobProgress(jobContext);
+ if (lockContext.tryLock(lockDefinition, 180000)) {
log.info("try lock success, jobId={}, shardingItem={}",
jobConfig.getJobId(), jobContext.getShardingItem());
try {
- prepareAndCheckTarget(jobContext);
+ JobProgress jobProgress =
RuleAlteredJobAPIFactory.getInstance().getJobProgress(jobContext.getJobId(),
jobContext.getShardingItem());
+ boolean prepareFlag =
JobStatus.PREPARING.equals(jobProgress.getStatus()) ||
JobStatus.RUNNING.equals(jobProgress.getStatus())
+ ||
JobStatus.PREPARING_FAILURE.equals(jobProgress.getStatus());
+ if (prepareFlag) {
+ log.info("execute prepare, jobId={}, shardingItem={}",
jobConfig.getJobId(), jobContext.getShardingItem());
+ jobContext.setStatus(JobStatus.PREPARING);
+
RuleAlteredJobAPIFactory.getInstance().updateShardingJobStatus(jobConfig.getJobId(),
jobContext.getShardingItem(), JobStatus.PREPARING);
+ prepareAndCheckTarget(jobContext);
+ // TODO Loop insert zookeeper performance is not good
+ for (int i = 0; i <=
jobContext.getJobConfig().getJobShardingCount(); i++) {
+
RuleAlteredJobAPIFactory.getInstance().updateShardingJobStatus(jobConfig.getJobId(),
i, JobStatus.PREPARE_SUCCESS);
+ }
+ }
} finally {
+ log.info("unlock, jobId={}, shardingItem={}",
jobConfig.getJobId(), jobContext.getShardingItem());
lockContext.unlock(lockDefinition);
}
- } else {
- log.info("wait lock released, jobId={}, shardingItem={}",
jobConfig.getJobId(), jobContext.getShardingItem());
- waitUntilLockReleased(lockContext, lockDefinition);
- }
- }
-
- private void waitUntilLockReleased(final LockContext lockContext, final
LockDefinition lockDefinition) {
- for (int loopCount = 0; loopCount < 30; loopCount++) {
- log.info("waiting for lock released, lockKey={}, loopCount={}",
lockDefinition.getLockKey(), loopCount);
- ThreadUtil.sleep(TimeUnit.SECONDS.toMillis(5));
- if (!lockContext.isLocked(lockDefinition)) {
- log.info("unlocked, lockName={}", lockDefinition.getLockKey());
- return;
- }
}
}
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/compose/DockerComposedContainer.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/compose/DockerComposedContainer.java
index 8ea86738115..5cf80fcb3dc 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/compose/DockerComposedContainer.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/compose/DockerComposedContainer.java
@@ -49,8 +49,11 @@ public final class DockerComposedContainer extends
BaseComposedContainer {
ShardingSphereProxyClusterContainer proxyClusterContainer =
(ShardingSphereProxyClusterContainer)
AdapterContainerFactory.newInstance("Cluster", "proxy", databaseType,
storageContainer, "", "scaling");
proxyClusterContainer.dependsOn(governanceContainer, storageContainer);
- // TODO use proxy cluster will cause error sometimes, need to fix it.
- this.proxyContainer =
getContainers().registerContainer(proxyClusterContainer);
+ proxyContainer =
getContainers().registerContainer(proxyClusterContainer);
+ ShardingSphereProxyClusterContainer anotherProxyContainer =
+ (ShardingSphereProxyClusterContainer)
AdapterContainerFactory.newInstance("Cluster", "proxy", databaseType,
storageContainer, "", "scaling");
+ anotherProxyContainer.dependsOn(governanceContainer, storageContainer);
+ getContainers().registerContainer(anotherProxyContainer);
}
@Override