This is an automated email from the ASF dual-hosted git repository.
jianglongtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 89233548ecc Add DialectDatabaseMetaData.isSupportGlobalCSN() (#29478)
89233548ecc is described below
commit 89233548ecc052c488684e93c1917bc44fb91993
Author: Liang Zhang <[email protected]>
AuthorDate: Wed Dec 20 23:50:00 2023 +0800
Add DialectDatabaseMetaData.isSupportGlobalCSN() (#29478)
* Rename DataSourceCheckEngine.checkSourceDataSources()
* Refactor DataSourceCheckEngine
* Rename IncrementalTaskPreparer
* Rename IncrementalTaskPositionManager
* Rename IncrementalTaskPositionManager
* Refactor CDCJobAPI
* Add DialectDatabaseMetaData.isSupportGlobalCSN()
* Add DialectDatabaseMetaData.isSupportGlobalCSN()
---
.../core/metadata/database/DialectDatabaseMetaData.java | 11 ++++++++++-
.../metadata/database/OpenGaussDatabaseMetaData.java | 5 +++++
.../shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java | 4 ++--
.../data/pipeline/cdc/core/prepare/CDCJobPreparer.java | 12 ++++--------
4 files changed, 21 insertions(+), 11 deletions(-)
diff --git
a/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/metadata/database/DialectDatabaseMetaData.java
b/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/metadata/database/DialectDatabaseMetaData.java
index f9a7457c93c..4ec732cd5a7 100644
---
a/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/metadata/database/DialectDatabaseMetaData.java
+++
b/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/metadata/database/DialectDatabaseMetaData.java
@@ -17,9 +17,9 @@
package org.apache.shardingsphere.infra.database.core.metadata.database;
-import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI;
import
org.apache.shardingsphere.infra.database.core.metadata.database.enums.NullsOrderType;
import
org.apache.shardingsphere.infra.database.core.metadata.database.enums.QuoteCharacter;
+import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
import java.sql.Connection;
@@ -129,4 +129,13 @@ public interface DialectDatabaseMetaData extends
DatabaseTypedSPI {
default boolean isSupportThreeTierStorageStructure() {
return false;
}
+
+ /**
+ * Is support global CSN.
+ *
+ * @return support or not
+ */
+ default boolean isSupportGlobalCSN() {
+ return false;
+ }
}
diff --git
a/infra/database/type/opengauss/src/main/java/org/apache/shardingsphere/infra/database/opengauss/metadata/database/OpenGaussDatabaseMetaData.java
b/infra/database/type/opengauss/src/main/java/org/apache/shardingsphere/infra/database/opengauss/metadata/database/OpenGaussDatabaseMetaData.java
index 78ec5a38a73..5086260f9b7 100644
---
a/infra/database/type/opengauss/src/main/java/org/apache/shardingsphere/infra/database/opengauss/metadata/database/OpenGaussDatabaseMetaData.java
+++
b/infra/database/type/opengauss/src/main/java/org/apache/shardingsphere/infra/database/opengauss/metadata/database/OpenGaussDatabaseMetaData.java
@@ -70,6 +70,11 @@ public final class OpenGaussDatabaseMetaData implements
DialectDatabaseMetaData
return Optional.of("public");
}
+ @Override
+ public boolean isSupportGlobalCSN() {
+ return true;
+ }
+
@Override
public String getDatabaseType() {
return "openGauss";
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index 59b51ab86c9..29d67efe7f8 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -201,8 +201,8 @@ public final class CDCJobAPI implements TransmissionJobAPI {
TransmissionJobItemProgress result = new TransmissionJobItemProgress();
result.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
result.setDataSourceName(incrementalDumperContext.getCommonContext().getDataSourceName());
- IncrementalTaskProgress incrementalTaskProgress = new
IncrementalTaskProgress(new IncrementalTaskPositionManager(
-
incrementalDumperContext.getCommonContext().getDataSourceConfig().getDatabaseType()).getPosition(null,
incrementalDumperContext, dataSourceManager));
+ IncrementalTaskPositionManager positionManager = new
IncrementalTaskPositionManager(incrementalDumperContext.getCommonContext().getDataSourceConfig().getDatabaseType());
+ IncrementalTaskProgress incrementalTaskProgress = new
IncrementalTaskProgress(positionManager.getPosition(null,
incrementalDumperContext, dataSourceManager));
result.setIncremental(new
JobItemIncrementalTasksProgress(incrementalTaskProgress));
return result;
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index 51f717ec8bf..c527f4f9312 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -46,9 +46,9 @@ import
org.apache.shardingsphere.data.pipeline.core.spi.ingest.dumper.Incrementa
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
import
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
+import
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
-import
org.apache.shardingsphere.infra.database.opengauss.type.OpenGaussDatabaseType;
import java.sql.SQLException;
import java.util.Collection;
@@ -127,7 +127,7 @@ public final class CDCJobPreparer {
Dumper dumper = new InventoryDumper(each, channel,
jobItemContext.getSourceDataSource(), jobItemContext.getSourceMetaDataLoader());
Importer importer = importerUsed.get() ? null
: new CDCImporter(channelProgressPairs,
importerConfig.getBatchSize(), 100, TimeUnit.MILLISECONDS,
jobItemContext.getSink(),
-
needSorting(hasGlobalCSN(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType())),
+
needSorting(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType()),
importerConfig.getRateLimitAlgorithm());
jobItemContext.getInventoryTasks().add(new
CDCInventoryTask(PipelineTaskUtils.generateInventoryTaskId(each),
processContext.getInventoryDumperExecuteEngine(),
processContext.getInventoryImporterExecuteEngine(),
dumper, importer, position));
@@ -138,12 +138,8 @@ public final class CDCJobPreparer {
log.info("initInventoryTasks cost {} ms", System.currentTimeMillis() -
startTimeMillis);
}
- private boolean needSorting(final boolean hasGlobalCSN) {
- return hasGlobalCSN;
- }
-
- private boolean hasGlobalCSN(final DatabaseType databaseType) {
- return databaseType instanceof OpenGaussDatabaseType;
+ private boolean needSorting(final DatabaseType databaseType) {
+ return
DatabaseTypedSPILoader.getService(DialectDatabaseMetaData.class,
databaseType).isSupportGlobalCSN();
}
private void initIncrementalTask(final CDCJobItemContext jobItemContext,
final AtomicBoolean importerUsed, final List<CDCChannelProgressPair>
channelProgressPairs) {