This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 32f7c8c9786 Add PostgreSQL and openGauss estimated count SQL (#24321)
32f7c8c9786 is described below
commit 32f7c8c9786068484a365a20ca69104a4e6d568c
Author: Xinze Guo <[email protected]>
AuthorDate: Fri Feb 24 10:02:35 2023 +0800
Add PostgreSQL and openGauss estimated count SQL (#24321)
* Add PostgreSQL and openGauss estimated count sql
* Fix build insert sql without unique keys of PostgresSQL
* Improve show consistency check remaining_seconds
* Throw unsupported exception when unique key is null at data match check
* Add E2E test case
* Improve inventory percentage of show job status
---
.../impl/AbstractInventoryIncrementalJobAPIImpl.java | 4 +++-
.../DataMatchDataConsistencyCalculateAlgorithm.java | 3 +++
.../pipeline/core/prepare/InventoryTaskSplitter.java | 13 +++++++++----
.../sqlbuilder/OpenGaussPipelineSQLBuilder.java | 4 ++--
.../sqlbuilder/PostgreSQLPipelineSQLBuilder.java | 11 ++++++++---
.../api/impl/ConsistencyCheckJobAPI.java | 2 +-
.../migration/primarykey/IndexesMigrationE2EIT.java | 19 ++++++++++++++-----
7 files changed, 40 insertions(+), 16 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index c95f79f5357..fa694640d81 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -119,7 +119,9 @@ public abstract class
AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
continue;
}
int inventoryFinishedPercentage = 0;
- if (0 != jobItemProgress.getProcessedRecordsCount() && 0 !=
jobItemProgress.getInventoryRecordsCount()) {
+ if (JobStatus.EXECUTE_INCREMENTAL_TASK ==
jobItemProgress.getStatus()) {
+ inventoryFinishedPercentage = 100;
+ } else if (0 != jobItemProgress.getProcessedRecordsCount() && 0 !=
jobItemProgress.getInventoryRecordsCount()) {
inventoryFinishedPercentage = (int) Math.min(100,
jobItemProgress.getProcessedRecordsCount() * 100 /
jobItemProgress.getInventoryRecordsCount());
}
String errorMessage = getJobItemErrorMessage(jobId, shardingItem);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index 482beb4b306..14a6a4ad919 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -174,6 +174,9 @@ public final class
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
}
private String getQuerySQL(final DataConsistencyCalculateParameter param) {
+ if (null == param.getUniqueKey()) {
+ throw new UnsupportedOperationException("Data consistency of
DATA_MATCH type not support table without unique key and primary key now");
+ }
PipelineSQLBuilder sqlBuilder =
PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class,
param.getDatabaseType());
String logicTableName = param.getLogicTableName();
String schemaName = param.getSchemaName();
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index 84aec4aa43d..ecd85d92b3d 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -44,6 +44,9 @@ import
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChanne
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -176,7 +179,8 @@ public final class InventoryTaskSplitter {
Optional<String> sql =
pipelineSQLBuilder.buildEstimatedCountSQL(schemaName, actualTableName);
try {
if (sql.isPresent()) {
- long result = getEstimatedCount(dataSource, sql.get());
+ DatabaseType databaseType =
TypedSPILoader.getService(DatabaseType.class,
jobConfig.getSourceDatabaseType());
+ long result = getEstimatedCount(databaseType, dataSource,
sql.get());
return result > 0 ? result : getCount(dataSource,
pipelineSQLBuilder.buildCountSQL(schemaName, actualTableName));
}
return getCount(dataSource,
pipelineSQLBuilder.buildCountSQL(schemaName, actualTableName));
@@ -186,12 +190,13 @@ public final class InventoryTaskSplitter {
}
}
- // TODO maybe need refactor after PostgreSQL support estimated count.
- private long getEstimatedCount(final DataSource dataSource, final String
estimatedCountSQL) throws SQLException {
+ private long getEstimatedCount(final DatabaseType databaseType, final
DataSource dataSource, final String estimatedCountSQL) throws SQLException {
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement =
connection.prepareStatement(estimatedCountSQL)) {
- preparedStatement.setString(1, connection.getCatalog());
+ if (databaseType instanceof MySQLDatabaseType) {
+ preparedStatement.setString(1, connection.getCatalog());
+ }
try (ResultSet resultSet = preparedStatement.executeQuery()) {
resultSet.next();
return resultSet.getLong(1);
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
index 332b89b3ea8..536d4824d01 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
@@ -89,8 +89,8 @@ public final class OpenGaussPipelineSQLBuilder extends
AbstractPipelineSQLBuilde
@Override
public Optional<String> buildEstimatedCountSQL(final String schemaName,
final String tableName) {
- // TODO Support estimated count later.
- return Optional.empty();
+ String qualifiedTableName = getQualifiedTableName(schemaName,
tableName);
+ return Optional.of(String.format("SELECT reltuples::integer FROM
pg_class WHERE oid='%s'::regclass::oid;", qualifiedTableName));
}
@Override
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
index 3f9db6be99d..917463fa082 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
@@ -63,7 +63,12 @@ public final class PostgreSQLPipelineSQLBuilder extends
AbstractPipelineSQLBuild
@Override
public String buildInsertSQL(final String schemaName, final DataRecord
dataRecord) {
- return super.buildInsertSQL(schemaName, dataRecord) +
buildConflictSQL(dataRecord);
+ String result = super.buildInsertSQL(schemaName, dataRecord);
+ // TODO without unique key, job has been interrupted, which may lead
to data duplication
+ if (dataRecord.getUniqueKeyValue().isEmpty()) {
+ return result;
+ }
+ return result + buildConflictSQL(dataRecord);
}
// Refer to https://www.postgresql.org/docs/current/sql-insert.html
@@ -87,8 +92,8 @@ public final class PostgreSQLPipelineSQLBuilder extends
AbstractPipelineSQLBuild
@Override
public Optional<String> buildEstimatedCountSQL(final String schemaName,
final String tableName) {
- // TODO Support estimated count later.
- return Optional.empty();
+ String qualifiedTableName = getQualifiedTableName(schemaName,
tableName);
+ return Optional.of(String.format("SELECT reltuples::integer FROM
pg_class WHERE oid='%s'::regclass::oid;", qualifiedTableName));
}
@Override
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index 6a506e6ca9a..b5221009179 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -297,7 +297,7 @@ public final class ConsistencyCheckJobAPI extends
AbstractPipelineJobAPIImpl {
if (null != stopTimeMillis) {
result.setCheckEndTime(DATE_TIME_FORMATTER.format(new
Timestamp(stopTimeMillis).toLocalDateTime()));
}
- long remainingMills = (long) ((recordsCount - checkedRecordsCount)
* 1.0D / checkedRecordsCount * durationMillis);
+ long remainingMills = Math.max(0, (long) ((recordsCount -
checkedRecordsCount) * 1.0D / checkedRecordsCount * durationMillis));
result.setRemainingSeconds(remainingMills / 1000);
}
String tableNames = jobItemProgress.getTableNames();
diff --git
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index af5498037be..a2e11733595 100644
---
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -20,6 +20,7 @@ package
org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.primary
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import
org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
import
org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
import
org.apache.shardingsphere.sharding.algorithm.keygen.UUIDKeyGenerateAlgorithm;
import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
@@ -70,11 +71,14 @@ public final class IndexesMigrationE2EIT extends
AbstractMigrationE2EIT {
if (PipelineBaseE2EIT.ENV.getItEnvType() == PipelineEnvTypeEnum.NONE) {
return result;
}
- List<String> versions =
PipelineBaseE2EIT.ENV.listStorageContainerImages(new MySQLDatabaseType());
- if (versions.isEmpty()) {
- return result;
+ List<String> mysqlVersion =
PipelineBaseE2EIT.ENV.listStorageContainerImages(new MySQLDatabaseType());
+ if (!mysqlVersion.isEmpty()) {
+ result.add(new PipelineTestParameter(new MySQLDatabaseType(),
mysqlVersion.get(0), "env/common/none.xml"));
+ }
+ List<String> postgresqlVersion =
PipelineBaseE2EIT.ENV.listStorageContainerImages(new PostgreSQLDatabaseType());
+ if (!postgresqlVersion.isEmpty()) {
+ result.add(new PipelineTestParameter(new PostgreSQLDatabaseType(),
postgresqlVersion.get(0), "env/common/none.xml"));
}
- result.add(new PipelineTestParameter(new MySQLDatabaseType(),
versions.get(0), "env/common/none.xml"));
return result;
}
@@ -91,6 +95,9 @@ public final class IndexesMigrationE2EIT extends
AbstractMigrationE2EIT {
sql = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT NULL,
`user_id` INT NOT NULL, `status` varchar(255)) ENGINE=InnoDB DEFAULT
CHARSET=utf8mb4";
// DATA_MATCH doesn't supported, could not order by records
consistencyCheckAlgorithmType = "CRC32_MATCH";
+ } else if (getDatabaseType() instanceof PostgreSQLDatabaseType) {
+ sql = "CREATE TABLE %s (order_id varchar(255) NOT NULL,user_id int
NOT NULL,status varchar(255) NULL)";
+ consistencyCheckAlgorithmType = null;
} else {
return;
}
@@ -160,7 +167,9 @@ public final class IndexesMigrationE2EIT extends
AbstractMigrationE2EIT {
}
assertProxyOrderRecordExist("t_order", primaryKey);
waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'",
jobId));
- assertCheckMigrationSuccess(jobId, consistencyCheckAlgorithmType);
+ if (null != consistencyCheckAlgorithmType) {
+ assertCheckMigrationSuccess(jobId, consistencyCheckAlgorithmType);
+ }
commitMigrationByJobId(jobId);
proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
assertThat(getTargetTableRecordsCount(getSourceTableOrderName()),
is(PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT + 1));