This is an automated email from the ASF dual-hosted git repository.

jianglongtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 89233548ecc Add DialectDatabaseMetaData.isSupportGlobalCSN() (#29478)
89233548ecc is described below

commit 89233548ecc052c488684e93c1917bc44fb91993
Author: Liang Zhang <[email protected]>
AuthorDate: Wed Dec 20 23:50:00 2023 +0800

    Add DialectDatabaseMetaData.isSupportGlobalCSN() (#29478)
    
    * Rename DataSourceCheckEngine.checkSourceDataSources()
    
    * Refactor DataSourceCheckEngine
    
    * Rename IncrementalTaskPreparer
    
    * Rename IncrementalTaskPositionManager
    
    * Rename IncrementalTaskPositionManager
    
    * Refactor CDCJobAPI
    
    * Add DialectDatabaseMetaData.isSupportGlobalCSN()
    
    * Add DialectDatabaseMetaData.isSupportGlobalCSN()
---
 .../core/metadata/database/DialectDatabaseMetaData.java      | 11 ++++++++++-
 .../metadata/database/OpenGaussDatabaseMetaData.java         |  5 +++++
 .../shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java      |  4 ++--
 .../data/pipeline/cdc/core/prepare/CDCJobPreparer.java       | 12 ++++--------
 4 files changed, 21 insertions(+), 11 deletions(-)

diff --git 
a/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/metadata/database/DialectDatabaseMetaData.java
 
b/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/metadata/database/DialectDatabaseMetaData.java
index f9a7457c93c..4ec732cd5a7 100644
--- 
a/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/metadata/database/DialectDatabaseMetaData.java
+++ 
b/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/metadata/database/DialectDatabaseMetaData.java
@@ -17,9 +17,9 @@
 
 package org.apache.shardingsphere.infra.database.core.metadata.database;
 
-import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI;
 import 
org.apache.shardingsphere.infra.database.core.metadata.database.enums.NullsOrderType;
 import 
org.apache.shardingsphere.infra.database.core.metadata.database.enums.QuoteCharacter;
+import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI;
 import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
 
 import java.sql.Connection;
@@ -129,4 +129,13 @@ public interface DialectDatabaseMetaData extends 
DatabaseTypedSPI {
     default boolean isSupportThreeTierStorageStructure() {
         return false;
     }
+    
+    /**
+     * Is support global CSN.
+     * 
+     * @return support or not
+     */
+    default boolean isSupportGlobalCSN() {
+        return false;
+    }
 }
diff --git 
a/infra/database/type/opengauss/src/main/java/org/apache/shardingsphere/infra/database/opengauss/metadata/database/OpenGaussDatabaseMetaData.java
 
b/infra/database/type/opengauss/src/main/java/org/apache/shardingsphere/infra/database/opengauss/metadata/database/OpenGaussDatabaseMetaData.java
index 78ec5a38a73..5086260f9b7 100644
--- 
a/infra/database/type/opengauss/src/main/java/org/apache/shardingsphere/infra/database/opengauss/metadata/database/OpenGaussDatabaseMetaData.java
+++ 
b/infra/database/type/opengauss/src/main/java/org/apache/shardingsphere/infra/database/opengauss/metadata/database/OpenGaussDatabaseMetaData.java
@@ -70,6 +70,11 @@ public final class OpenGaussDatabaseMetaData implements 
DialectDatabaseMetaData
         return Optional.of("public");
     }
     
+    @Override
+    public boolean isSupportGlobalCSN() {
+        return true;
+    }
+    
     @Override
     public String getDatabaseType() {
         return "openGauss";
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index 59b51ab86c9..29d67efe7f8 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -201,8 +201,8 @@ public final class CDCJobAPI implements TransmissionJobAPI {
         TransmissionJobItemProgress result = new TransmissionJobItemProgress();
         result.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
         
result.setDataSourceName(incrementalDumperContext.getCommonContext().getDataSourceName());
-        IncrementalTaskProgress incrementalTaskProgress = new 
IncrementalTaskProgress(new IncrementalTaskPositionManager(
-                
incrementalDumperContext.getCommonContext().getDataSourceConfig().getDatabaseType()).getPosition(null,
 incrementalDumperContext, dataSourceManager));
+        IncrementalTaskPositionManager positionManager = new 
IncrementalTaskPositionManager(incrementalDumperContext.getCommonContext().getDataSourceConfig().getDatabaseType());
+        IncrementalTaskProgress incrementalTaskProgress = new 
IncrementalTaskProgress(positionManager.getPosition(null, 
incrementalDumperContext, dataSourceManager));
         result.setIncremental(new 
JobItemIncrementalTasksProgress(incrementalTaskProgress));
         return result;
     }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index 51f717ec8bf..c527f4f9312 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -46,9 +46,9 @@ import 
org.apache.shardingsphere.data.pipeline.core.spi.ingest.dumper.Incrementa
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
+import 
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
 import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
-import 
org.apache.shardingsphere.infra.database.opengauss.type.OpenGaussDatabaseType;
 
 import java.sql.SQLException;
 import java.util.Collection;
@@ -127,7 +127,7 @@ public final class CDCJobPreparer {
             Dumper dumper = new InventoryDumper(each, channel, 
jobItemContext.getSourceDataSource(), jobItemContext.getSourceMetaDataLoader());
             Importer importer = importerUsed.get() ? null
                     : new CDCImporter(channelProgressPairs, 
importerConfig.getBatchSize(), 100, TimeUnit.MILLISECONDS, 
jobItemContext.getSink(),
-                            
needSorting(hasGlobalCSN(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType())),
+                            
needSorting(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType()),
                             importerConfig.getRateLimitAlgorithm());
             jobItemContext.getInventoryTasks().add(new 
CDCInventoryTask(PipelineTaskUtils.generateInventoryTaskId(each), 
processContext.getInventoryDumperExecuteEngine(),
                     processContext.getInventoryImporterExecuteEngine(), 
dumper, importer, position));
@@ -138,12 +138,8 @@ public final class CDCJobPreparer {
         log.info("initInventoryTasks cost {} ms", System.currentTimeMillis() - 
startTimeMillis);
     }
     
-    private boolean needSorting(final boolean hasGlobalCSN) {
-        return hasGlobalCSN;
-    }
-    
-    private boolean hasGlobalCSN(final DatabaseType databaseType) {
-        return databaseType instanceof OpenGaussDatabaseType;
+    private boolean needSorting(final DatabaseType databaseType) {
+        return 
DatabaseTypedSPILoader.getService(DialectDatabaseMetaData.class, 
databaseType).isSupportGlobalCSN();
     }
     
     private void initIncrementalTask(final CDCJobItemContext jobItemContext, 
final AtomicBoolean importerUsed, final List<CDCChannelProgressPair> 
channelProgressPairs) {

Reply via email to