This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 95de254e0ca Fix table without unique key increment task write failed
(#25345)
95de254e0ca is described below
commit 95de254e0cafe1c2f6b1416e1ba78ff18d35c6c9
Author: Xinze Guo <[email protected]>
AuthorDate: Fri Apr 28 11:14:16 2023 +0800
Fix table without unique key increment task write failed (#25345)
* Fix table without unique key increment task write failed
* Remove delete update at no unique key e2e
* Improve show data consistency check
* Fix test
* Add more log
---
.../data/pipeline/api/ingest/record/Column.java | 2 +-
.../pipeline/core/importer/DataSourceImporter.java | 12 +++--
.../data/pipeline/core/record/RecordUtils.java | 17 ++++---
.../sqlbuilder/AbstractPipelineSQLBuilder.java | 2 +-
.../core/sqlbuilder/PipelineSQLBuilderTest.java | 26 +++++++++-
.../MigrationDataConsistencyChecker.java | 2 +
.../primarykey/IndexesMigrationE2EIT.java | 58 +++++++++++++++++++---
.../pipeline/cases/task/E2EIncrementalTask.java | 1 +
.../core/importer/DataSourceImporterTest.java | 4 +-
9 files changed, 103 insertions(+), 21 deletions(-)
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/Column.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/Column.java
index 24a25b8a765..972ac5bfc9b 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/Column.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/Column.java
@@ -46,6 +46,6 @@ public final class Column {
@Override
public String toString() {
- return String.format("%s=%s", name, value);
+ return String.format("%s: oldValue=%s, value=%s", name, oldValue,
value);
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
index 7e94bfc009f..163b4fd39e8 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
@@ -95,7 +95,7 @@ public final class DataSourceImporter extends
AbstractLifecycleExecutor implemen
@Override
protected void runBlocking() {
- int batchSize = importerConfig.getBatchSize() * 2;
+ int batchSize = importerConfig.getBatchSize();
while (isRunning()) {
List<Record> records = channel.fetchRecords(batchSize, 3,
TimeUnit.SECONDS);
if (null != records && !records.isEmpty()) {
@@ -246,8 +246,14 @@ public final class DataSourceImporter extends
AbstractLifecycleExecutor implemen
}
for (int i = 0; i < conditionColumns.size(); i++) {
Column keyColumn = conditionColumns.get(i);
- preparedStatement.setObject(updatedColumns.size() + i + 1,
keyColumn.isUniqueKey() && keyColumn.isUpdated() ? keyColumn.getOldValue() :
keyColumn.getValue());
+ // TODO There to be compatible with PostgreSQL before value is
null except primary key and unsupported updating sharding value now.
+ if (shardingColumns.contains(keyColumn.getName()) &&
keyColumn.getOldValue() == null) {
+ preparedStatement.setObject(updatedColumns.size() + i + 1,
keyColumn.getValue());
+ continue;
+ }
+ preparedStatement.setObject(updatedColumns.size() + i + 1,
keyColumn.getOldValue());
}
+ // TODO if table without unique key the conditionColumns before
values is null, so update will fail at PostgreSQL
int updateCount = preparedStatement.executeUpdate();
if (1 != updateCount) {
log.warn("executeUpdate failed, updateCount={}, updateSql={},
updatedColumns={}, conditionColumns={}", updateCount, updateSql,
updatedColumns, conditionColumns);
@@ -276,7 +282,7 @@ public final class DataSourceImporter extends
AbstractLifecycleExecutor implemen
}
int[] counts = preparedStatement.executeBatch();
if (IntStream.of(counts).anyMatch(value -> 1 != value)) {
- log.warn("batchDelete failed, counts={}, sql={}",
Arrays.toString(counts), deleteSQL);
+ log.warn("batchDelete failed, counts={}, sql={},
conditionColumns={}", Arrays.toString(counts), deleteSQL, conditionColumns);
}
} finally {
batchDeleteStatement = null;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtils.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtils.java
index 816747a839f..68336abd7c9 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtils.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtils.java
@@ -26,6 +26,7 @@ import
org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
/**
@@ -64,7 +65,11 @@ public final class RecordUtils {
result.add(each);
}
}
- return result;
+ Optional<Column> uniqueKeyColumn =
dataRecord.getColumns().stream().filter(Column::isUniqueKey).findFirst();
+ if (uniqueKeyColumn.isPresent()) {
+ return result;
+ }
+ return dataRecord.getColumns();
}
/**
@@ -84,11 +89,11 @@ public final class RecordUtils {
}
/**
- * Get last normal record.
- *
- * @param records records
- * @return last normal record.
- */
+ * Get last normal record.
+ *
+ * @param records records
+ * @return last normal record.
+ */
public static Record getLastNormalRecord(final List<Record> records) {
for (int index = records.size() - 1; index >= 0; index--) {
Record record = records.get(index);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
index a3318855aa4..56bb8588b6a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
@@ -179,7 +179,7 @@ public abstract class AbstractPipelineSQLBuilder implements
PipelineSQLBuilder {
private String buildWhereSQL(final Collection<Column> conditionColumns) {
StringBuilder where = new StringBuilder();
for (Column each : conditionColumns) {
- where.append(String.format("%s = ? and ", quote(each.getName())));
+ where.append(String.format("%s = ? AND ", quote(each.getName())));
}
where.setLength(where.length() - 5);
return where.toString();
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderTest.java
index 823712643b1..7481a806933 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderTest.java
@@ -91,7 +91,7 @@ class PipelineSQLBuilderTest {
void assertBuildUpdateSQLWithShardingColumns() {
DataRecord dataRecord = mockDataRecord("t2");
String actual = pipelineSQLBuilder.buildUpdateSQL(null, dataRecord,
mockConditionColumns(dataRecord));
- assertThat(actual, is("UPDATE t2 SET c1 = ?,c2 = ?,c3 = ? WHERE id = ?
and sc = ?"));
+ assertThat(actual, is("UPDATE t2 SET c1 = ?,c2 = ?,c3 = ? WHERE id = ?
AND sc = ?"));
}
@Test
@@ -104,7 +104,7 @@ class PipelineSQLBuilderTest {
void assertBuildDeleteSQLWithConditionColumns() {
DataRecord dataRecord = mockDataRecord("t3");
String actual = pipelineSQLBuilder.buildDeleteSQL(null, dataRecord,
mockConditionColumns(dataRecord));
- assertThat(actual, is("DELETE FROM t3 WHERE id = ? and sc = ?"));
+ assertThat(actual, is("DELETE FROM t3 WHERE id = ? AND sc = ?"));
}
private Collection<Column> mockConditionColumns(final DataRecord
dataRecord) {
@@ -121,4 +121,26 @@ class PipelineSQLBuilderTest {
result.addColumn(new Column("c3", "", true, false));
return result;
}
+
+ @Test
+ void assertBuildDeleteSQLWithoutUniqueKey() {
+ String actual = pipelineSQLBuilder.buildDeleteSQL(null,
mockDataRecordWithoutUniqueKey("t_order"),
+
RecordUtils.extractConditionColumns(mockDataRecordWithoutUniqueKey("t_order"),
Collections.emptySet()));
+ assertThat(actual, is("DELETE FROM t_order WHERE id = ? AND name =
?"));
+ }
+
+ @Test
+ void assertBuildUpdateSQLWithoutShardingColumns() {
+ DataRecord dataRecord = mockDataRecordWithoutUniqueKey("t_order");
+ String actual = pipelineSQLBuilder.buildUpdateSQL(null, dataRecord,
mockConditionColumns(dataRecord));
+ assertThat(actual, is("UPDATE t_order SET name = ? WHERE id = ? AND
name = ?"));
+ }
+
+ private DataRecord mockDataRecordWithoutUniqueKey(final String tableName) {
+ DataRecord result = new DataRecord(new PlaceholderPosition(), 4);
+ result.setTableName(tableName);
+ result.addColumn(new Column("id", "", false, false));
+ result.addColumn(new Column("name", "", true, false));
+ return result;
+ }
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index b4fee8c9457..f139d9558a4 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -27,6 +27,7 @@ import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaName;
import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.TableName;
@@ -83,6 +84,7 @@ public final class MigrationDataConsistencyChecker implements
PipelineDataConsis
.forEach(dataNode ->
sourceTableNames.add(DataNodeUtils.formatWithSchema(dataNode)))));
progressContext.setRecordsCount(getRecordsCount());
progressContext.getTableNames().addAll(sourceTableNames);
+ progressContext.onProgressUpdated(new
PipelineJobProgressUpdatedParameter(0));
Map<String, DataConsistencyCheckResult> result = new LinkedHashMap<>();
PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
try {
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index daa8812b8d9..72d838da673 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -17,6 +17,7 @@
package
org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.primarykey;
+import lombok.SneakyThrows;
import org.apache.commons.codec.binary.Hex;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
@@ -86,17 +87,33 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
return;
}
KeyGenerateAlgorithm keyGenerateAlgorithm = new
UUIDKeyGenerateAlgorithm();
- Object uniqueKey = keyGenerateAlgorithm.generateKey();
- assertMigrationSuccess(containerComposer, sql, "user_id",
keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
- insertOneOrder(containerComposer, uniqueKey);
- containerComposer.assertProxyOrderRecordExist("t_order",
uniqueKey);
+ // TODO PostgreSQL update delete events not support if table
without unique keys at increment task.
+ final Callable<Void> incrementalTaskFn = () -> {
+ Object orderId = keyGenerateAlgorithm.generateKey();
+ insertOneOrder(containerComposer, orderId);
+ if (containerComposer.getDatabaseType() instanceof
MySQLDatabaseType) {
+ updateOneOrder(containerComposer, orderId, "updated");
+ deleteOneOrder(containerComposer, orderId, "updated");
+ insertOneOrder(containerComposer,
keyGenerateAlgorithm.generateKey());
+ }
return null;
- });
+ };
+ assertMigrationSuccess(containerComposer, sql, "user_id",
keyGenerateAlgorithm, consistencyCheckAlgorithmType, incrementalTaskFn);
}
}
+ @SneakyThrows
+ private void doCreateUpdateDelete(final PipelineContainerComposer
containerComposer, final Object orderId) {
+ String updatedStatus = "updated" + System.currentTimeMillis();
+ insertOneOrder(containerComposer, orderId);
+ updateOneOrder(containerComposer, orderId, updatedStatus);
+ deleteOneOrder(containerComposer, orderId, updatedStatus);
+ }
+
private void insertOneOrder(final PipelineContainerComposer
containerComposer, final Object uniqueKey) throws SQLException {
- try (PreparedStatement preparedStatement =
containerComposer.getSourceDataSource().getConnection().prepareStatement("INSERT
INTO t_order (order_id,user_id,status) VALUES (?,?,?)")) {
+ try (
+ Connection connection =
containerComposer.getSourceDataSource().getConnection();
+ PreparedStatement preparedStatement =
connection.prepareStatement("INSERT INTO t_order (order_id,user_id,status)
VALUES (?,?,?)")) {
preparedStatement.setObject(1, uniqueKey);
preparedStatement.setObject(2, 1);
preparedStatement.setObject(3, "OK");
@@ -105,6 +122,33 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT
{
}
}
+ private void updateOneOrder(final PipelineContainerComposer
containerComposer, final Object uniqueKey, final String updatedStatus) throws
SQLException {
+ try (
+ Connection connection =
containerComposer.getSourceDataSource().getConnection();
+ PreparedStatement preparedStatement = connection
+ .prepareStatement("UPDATE t_order SET status=? WHERE
order_id = ? AND user_id = ? AND status = ?")) {
+ preparedStatement.setObject(1, updatedStatus);
+ preparedStatement.setObject(2, uniqueKey);
+ preparedStatement.setObject(3, 1);
+ preparedStatement.setObject(4, "OK");
+ int actualCount = preparedStatement.executeUpdate();
+ assertThat(actualCount, is(1));
+ }
+ }
+
+ private void deleteOneOrder(final PipelineContainerComposer
containerComposer, final Object uniqueKey, final String updatedStatus) throws
SQLException {
+ try (
+ Connection connection =
containerComposer.getSourceDataSource().getConnection();
+ PreparedStatement preparedStatement = connection
+ .prepareStatement("DELETE FROM t_order WHERE order_id
= ? AND user_id = ? AND status = ?")) {
+ preparedStatement.setObject(1, uniqueKey);
+ preparedStatement.setObject(2, 1);
+ preparedStatement.setObject(3, updatedStatus);
+ int actualCount = preparedStatement.executeUpdate();
+ assertThat(actualCount, is(1));
+ }
+ }
+
@ParameterizedTest(name = "{0}")
@EnabledIf("isEnabled")
@ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
@@ -122,6 +166,7 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
Object uniqueKey = keyGenerateAlgorithm.generateKey();
assertMigrationSuccess(containerComposer, sql, "user_id",
keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
insertOneOrder(containerComposer, uniqueKey);
+ doCreateUpdateDelete(containerComposer,
keyGenerateAlgorithm.generateKey());
containerComposer.assertProxyOrderRecordExist("t_order",
uniqueKey);
return null;
});
@@ -145,6 +190,7 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
Object uniqueKey = keyGenerateAlgorithm.generateKey();
assertMigrationSuccess(containerComposer, sql, "user_id",
keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
insertOneOrder(containerComposer, uniqueKey);
+ doCreateUpdateDelete(containerComposer,
keyGenerateAlgorithm.generateKey());
containerComposer.assertProxyOrderRecordExist("t_order",
uniqueKey);
return null;
});
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
index 19b9e2b09e9..0e211b4e7e6 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
@@ -133,6 +133,7 @@ public final class E2EIncrementalTask extends
BaseIncrementTask {
private void setNullToAllFields(final Object orderId) {
if (databaseType instanceof MySQLDatabaseType) {
String sql =
SQLBuilderUtils.buildUpdateSQL(ignoreShardingColumns(MYSQL_COLUMN_NAMES),
orderTableName, "null");
+ log.info("update sql: {}", sql);
DataSourceExecuteUtils.execute(dataSource, sql, new
Object[]{orderId});
}
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java
index dc0140be0d5..1f190e36cbc 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java
@@ -122,7 +122,7 @@ class DataSourceImporterTest {
verify(preparedStatement).setObject(1, 20);
verify(preparedStatement).setObject(2, "UPDATE");
verify(preparedStatement).setObject(3, 1);
- verify(preparedStatement).setObject(4, 20);
+ verify(preparedStatement).setObject(4, 10);
verify(preparedStatement).executeUpdate();
}
@@ -137,7 +137,7 @@ class DataSourceImporterTest {
inOrder.verify(preparedStatement).setObject(2, 10);
inOrder.verify(preparedStatement).setObject(3, "UPDATE");
inOrder.verify(preparedStatement).setObject(4, 1);
- inOrder.verify(preparedStatement).setObject(5, 10);
+ inOrder.verify(preparedStatement).setObject(5, 0);
inOrder.verify(preparedStatement).executeUpdate();
}