This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new cfd03ffd662 Improve scaling prepare execute once at high concurrency. 
(#20062)
cfd03ffd662 is described below

commit cfd03ffd662d2b6edeccdb28c8df8c2d34363c85
Author: Xinze Guo <[email protected]>
AuthorDate: Wed Aug 10 22:16:53 2022 +0800

    Improve scaling prepare execute once at high concurrency. (#20062)
    
    * Improve scaling prepare execute once at high concurrency.
    
    * Use tryLock instead of polling
    
    * Fix ci error
---
 .../data/pipeline/api/job/JobStatus.java           |  5 ++++
 .../rulealtered/RuleAlteredJobPreparer.java        | 35 +++++++++++-----------
 .../container/compose/DockerComposedContainer.java |  7 +++--
 3 files changed, 27 insertions(+), 20 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
index 7741b8385f7..a15396d52fe 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
@@ -37,6 +37,11 @@ public enum JobStatus {
      */
     PREPARING(true),
     
+    /**
+     * Job is in prepare success status.
+     */
+    PREPARE_SUCCESS(true),
+    
     /**
      * Job is in execute inventory task status.
      */
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index b9acabdbcca..4cd69fbcc29 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
@@ -36,7 +37,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTa
 import 
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineJobPreparerUtils;
-import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.prepare.InventoryTaskSplitter;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
@@ -47,7 +47,6 @@ import 
org.apache.shardingsphere.mode.lock.ExclusiveLockDefinition;
 
 import java.sql.SQLException;
 import java.util.Collections;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Rule altered job preparer.
@@ -90,27 +89,27 @@ public final class RuleAlteredJobPreparer {
         String lockName = "prepare-" + jobConfig.getJobId();
         LockContext lockContext = 
PipelineContext.getContextManager().getInstanceContext().getLockContext();
         LockDefinition lockDefinition = new ExclusiveLockDefinition(lockName);
-        if (lockContext.tryLock(lockDefinition, 3000)) {
+        RuleAlteredJobAPIFactory.getInstance().persistJobProgress(jobContext);
+        if (lockContext.tryLock(lockDefinition, 180000)) {
             log.info("try lock success, jobId={}, shardingItem={}", 
jobConfig.getJobId(), jobContext.getShardingItem());
             try {
-                prepareAndCheckTarget(jobContext);
+                JobProgress jobProgress = 
RuleAlteredJobAPIFactory.getInstance().getJobProgress(jobContext.getJobId(), 
jobContext.getShardingItem());
+                boolean prepareFlag = 
JobStatus.PREPARING.equals(jobProgress.getStatus()) || 
JobStatus.RUNNING.equals(jobProgress.getStatus())
+                        || 
JobStatus.PREPARING_FAILURE.equals(jobProgress.getStatus());
+                if (prepareFlag) {
+                    log.info("execute prepare, jobId={}, shardingItem={}", 
jobConfig.getJobId(), jobContext.getShardingItem());
+                    jobContext.setStatus(JobStatus.PREPARING);
+                    
RuleAlteredJobAPIFactory.getInstance().updateShardingJobStatus(jobConfig.getJobId(),
 jobContext.getShardingItem(), JobStatus.PREPARING);
+                    prepareAndCheckTarget(jobContext);
+                    // TODO Loop insert zookeeper performance is not good
+                    for (int i = 0; i <= 
jobContext.getJobConfig().getJobShardingCount(); i++) {
+                        
RuleAlteredJobAPIFactory.getInstance().updateShardingJobStatus(jobConfig.getJobId(),
 i, JobStatus.PREPARE_SUCCESS);
+                    }
+                }
             } finally {
+                log.info("unlock, jobId={}, shardingItem={}", 
jobConfig.getJobId(), jobContext.getShardingItem());
                 lockContext.unlock(lockDefinition);
             }
-        } else {
-            log.info("wait lock released, jobId={}, shardingItem={}", 
jobConfig.getJobId(), jobContext.getShardingItem());
-            waitUntilLockReleased(lockContext, lockDefinition);
-        }
-    }
-    
-    private void waitUntilLockReleased(final LockContext lockContext, final 
LockDefinition lockDefinition) {
-        for (int loopCount = 0; loopCount < 30; loopCount++) {
-            log.info("waiting for lock released, lockKey={}, loopCount={}", 
lockDefinition.getLockKey(), loopCount);
-            ThreadUtil.sleep(TimeUnit.SECONDS.toMillis(5));
-            if (!lockContext.isLocked(lockDefinition)) {
-                log.info("unlocked, lockName={}", lockDefinition.getLockKey());
-                return;
-            }
         }
     }
     
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/compose/DockerComposedContainer.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/compose/DockerComposedContainer.java
index 8ea86738115..5cf80fcb3dc 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/compose/DockerComposedContainer.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/compose/DockerComposedContainer.java
@@ -49,8 +49,11 @@ public final class DockerComposedContainer extends 
BaseComposedContainer {
         ShardingSphereProxyClusterContainer proxyClusterContainer =
                 (ShardingSphereProxyClusterContainer) 
AdapterContainerFactory.newInstance("Cluster", "proxy", databaseType, 
storageContainer, "", "scaling");
         proxyClusterContainer.dependsOn(governanceContainer, storageContainer);
-        // TODO use proxy cluster will cause error sometimes, need to fix it.
-        this.proxyContainer = 
getContainers().registerContainer(proxyClusterContainer);
+        proxyContainer = 
getContainers().registerContainer(proxyClusterContainer);
+        ShardingSphereProxyClusterContainer anotherProxyContainer =
+                (ShardingSphereProxyClusterContainer) 
AdapterContainerFactory.newInstance("Cluster", "proxy", databaseType, 
storageContainer, "", "scaling");
+        anotherProxyContainer.dependsOn(governanceContainer, storageContainer);
+        getContainers().registerContainer(anotherProxyContainer);
     }
     
     @Override

Reply via email to