This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 95de254e0ca Fix table without unique key increment task write failed 
(#25345)
95de254e0ca is described below

commit 95de254e0cafe1c2f6b1416e1ba78ff18d35c6c9
Author: Xinze Guo <[email protected]>
AuthorDate: Fri Apr 28 11:14:16 2023 +0800

    Fix table without unique key increment task write failed (#25345)
    
    * Fix table without unique key increment task write failed
    
    * Remove delete update at no unique key e2e
    
    * Improve show data consistency check
    
    * Fix test
    
    * Add more log
---
 .../data/pipeline/api/ingest/record/Column.java    |  2 +-
 .../pipeline/core/importer/DataSourceImporter.java | 12 +++--
 .../data/pipeline/core/record/RecordUtils.java     | 17 ++++---
 .../sqlbuilder/AbstractPipelineSQLBuilder.java     |  2 +-
 .../core/sqlbuilder/PipelineSQLBuilderTest.java    | 26 +++++++++-
 .../MigrationDataConsistencyChecker.java           |  2 +
 .../primarykey/IndexesMigrationE2EIT.java          | 58 +++++++++++++++++++---
 .../pipeline/cases/task/E2EIncrementalTask.java    |  1 +
 .../core/importer/DataSourceImporterTest.java      |  4 +-
 9 files changed, 103 insertions(+), 21 deletions(-)

diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/Column.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/Column.java
index 24a25b8a765..972ac5bfc9b 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/Column.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/Column.java
@@ -46,6 +46,6 @@ public final class Column {
     
     @Override
     public String toString() {
-        return String.format("%s=%s", name, value);
+        return String.format("%s: oldValue=%s, value=%s", name, oldValue, 
value);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
index 7e94bfc009f..163b4fd39e8 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
@@ -95,7 +95,7 @@ public final class DataSourceImporter extends 
AbstractLifecycleExecutor implemen
     
     @Override
     protected void runBlocking() {
-        int batchSize = importerConfig.getBatchSize() * 2;
+        int batchSize = importerConfig.getBatchSize();
         while (isRunning()) {
             List<Record> records = channel.fetchRecords(batchSize, 3, 
TimeUnit.SECONDS);
             if (null != records && !records.isEmpty()) {
@@ -246,8 +246,14 @@ public final class DataSourceImporter extends 
AbstractLifecycleExecutor implemen
             }
             for (int i = 0; i < conditionColumns.size(); i++) {
                 Column keyColumn = conditionColumns.get(i);
-                preparedStatement.setObject(updatedColumns.size() + i + 1, 
keyColumn.isUniqueKey() && keyColumn.isUpdated() ? keyColumn.getOldValue() : 
keyColumn.getValue());
+                // TODO There to be compatible with PostgreSQL before value is 
null except primary key and unsupported updating sharding value now.
+                if (shardingColumns.contains(keyColumn.getName()) && 
keyColumn.getOldValue() == null) {
+                    preparedStatement.setObject(updatedColumns.size() + i + 1, 
keyColumn.getValue());
+                    continue;
+                }
+                preparedStatement.setObject(updatedColumns.size() + i + 1, 
keyColumn.getOldValue());
             }
+            // TODO if table without unique key the conditionColumns before 
values is null, so update will fail at PostgreSQL
             int updateCount = preparedStatement.executeUpdate();
             if (1 != updateCount) {
                 log.warn("executeUpdate failed, updateCount={}, updateSql={}, 
updatedColumns={}, conditionColumns={}", updateCount, updateSql, 
updatedColumns, conditionColumns);
@@ -276,7 +282,7 @@ public final class DataSourceImporter extends 
AbstractLifecycleExecutor implemen
             }
             int[] counts = preparedStatement.executeBatch();
             if (IntStream.of(counts).anyMatch(value -> 1 != value)) {
-                log.warn("batchDelete failed, counts={}, sql={}", 
Arrays.toString(counts), deleteSQL);
+                log.warn("batchDelete failed, counts={}, sql={}, 
conditionColumns={}", Arrays.toString(counts), deleteSQL, conditionColumns);
             }
         } finally {
             batchDeleteStatement = null;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtils.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtils.java
index 816747a839f..68336abd7c9 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtils.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtils.java
@@ -26,6 +26,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 
 /**
@@ -64,7 +65,11 @@ public final class RecordUtils {
                 result.add(each);
             }
         }
-        return result;
+        Optional<Column> uniqueKeyColumn = 
dataRecord.getColumns().stream().filter(Column::isUniqueKey).findFirst();
+        if (uniqueKeyColumn.isPresent()) {
+            return result;
+        }
+        return dataRecord.getColumns();
     }
     
     /**
@@ -84,11 +89,11 @@ public final class RecordUtils {
     }
     
     /**
-    * Get last normal record.
-    *
-    * @param records records
-    * @return last normal record.
-    */
+     * Get last normal record.
+     *
+     * @param records records
+     * @return last normal record.
+     */
     public static Record getLastNormalRecord(final List<Record> records) {
         for (int index = records.size() - 1; index >= 0; index--) {
             Record record = records.get(index);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
index a3318855aa4..56bb8588b6a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
@@ -179,7 +179,7 @@ public abstract class AbstractPipelineSQLBuilder implements 
PipelineSQLBuilder {
     private String buildWhereSQL(final Collection<Column> conditionColumns) {
         StringBuilder where = new StringBuilder();
         for (Column each : conditionColumns) {
-            where.append(String.format("%s = ? and ", quote(each.getName())));
+            where.append(String.format("%s = ? AND ", quote(each.getName())));
         }
         where.setLength(where.length() - 5);
         return where.toString();
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderTest.java
index 823712643b1..7481a806933 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderTest.java
@@ -91,7 +91,7 @@ class PipelineSQLBuilderTest {
     void assertBuildUpdateSQLWithShardingColumns() {
         DataRecord dataRecord = mockDataRecord("t2");
         String actual = pipelineSQLBuilder.buildUpdateSQL(null, dataRecord, 
mockConditionColumns(dataRecord));
-        assertThat(actual, is("UPDATE t2 SET c1 = ?,c2 = ?,c3 = ? WHERE id = ? 
and sc = ?"));
+        assertThat(actual, is("UPDATE t2 SET c1 = ?,c2 = ?,c3 = ? WHERE id = ? 
AND sc = ?"));
     }
     
     @Test
@@ -104,7 +104,7 @@ class PipelineSQLBuilderTest {
     void assertBuildDeleteSQLWithConditionColumns() {
         DataRecord dataRecord = mockDataRecord("t3");
         String actual = pipelineSQLBuilder.buildDeleteSQL(null, dataRecord, 
mockConditionColumns(dataRecord));
-        assertThat(actual, is("DELETE FROM t3 WHERE id = ? and sc = ?"));
+        assertThat(actual, is("DELETE FROM t3 WHERE id = ? AND sc = ?"));
     }
     
     private Collection<Column> mockConditionColumns(final DataRecord 
dataRecord) {
@@ -121,4 +121,26 @@ class PipelineSQLBuilderTest {
         result.addColumn(new Column("c3", "", true, false));
         return result;
     }
+    
+    @Test
+    void assertBuildDeleteSQLWithoutUniqueKey() {
+        String actual = pipelineSQLBuilder.buildDeleteSQL(null, 
mockDataRecordWithoutUniqueKey("t_order"),
+                
RecordUtils.extractConditionColumns(mockDataRecordWithoutUniqueKey("t_order"), 
Collections.emptySet()));
+        assertThat(actual, is("DELETE FROM t_order WHERE id = ? AND name = 
?"));
+    }
+    
+    @Test
+    void assertBuildUpdateSQLWithoutShardingColumns() {
+        DataRecord dataRecord = mockDataRecordWithoutUniqueKey("t_order");
+        String actual = pipelineSQLBuilder.buildUpdateSQL(null, dataRecord, 
mockConditionColumns(dataRecord));
+        assertThat(actual, is("UPDATE t_order SET name = ? WHERE id = ? AND 
name = ?"));
+    }
+    
+    private DataRecord mockDataRecordWithoutUniqueKey(final String tableName) {
+        DataRecord result = new DataRecord(new PlaceholderPosition(), 4);
+        result.setTableName(tableName);
+        result.addColumn(new Column("id", "", false, false));
+        result.addColumn(new Column("name", "", true, false));
+        return result;
+    }
 }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index b4fee8c9457..f139d9558a4 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -27,6 +27,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
 import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.TableName;
@@ -83,6 +84,7 @@ public final class MigrationDataConsistencyChecker implements 
PipelineDataConsis
                 .forEach(dataNode -> 
sourceTableNames.add(DataNodeUtils.formatWithSchema(dataNode)))));
         progressContext.setRecordsCount(getRecordsCount());
         progressContext.getTableNames().addAll(sourceTableNames);
+        progressContext.onProgressUpdated(new 
PipelineJobProgressUpdatedParameter(0));
         Map<String, DataConsistencyCheckResult> result = new LinkedHashMap<>();
         PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
         try {
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index daa8812b8d9..72d838da673 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -17,6 +17,7 @@
 
 package 
org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.primarykey;
 
+import lombok.SneakyThrows;
 import org.apache.commons.codec.binary.Hex;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
@@ -86,17 +87,33 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
                 return;
             }
             KeyGenerateAlgorithm keyGenerateAlgorithm = new 
UUIDKeyGenerateAlgorithm();
-            Object uniqueKey = keyGenerateAlgorithm.generateKey();
-            assertMigrationSuccess(containerComposer, sql, "user_id", 
keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
-                insertOneOrder(containerComposer, uniqueKey);
-                containerComposer.assertProxyOrderRecordExist("t_order", 
uniqueKey);
+            // TODO PostgreSQL update delete events not support if table 
without unique keys at increment task.
+            final Callable<Void> incrementalTaskFn = () -> {
+                Object orderId = keyGenerateAlgorithm.generateKey();
+                insertOneOrder(containerComposer, orderId);
+                if (containerComposer.getDatabaseType() instanceof 
MySQLDatabaseType) {
+                    updateOneOrder(containerComposer, orderId, "updated");
+                    deleteOneOrder(containerComposer, orderId, "updated");
+                    insertOneOrder(containerComposer, 
keyGenerateAlgorithm.generateKey());
+                }
                 return null;
-            });
+            };
+            assertMigrationSuccess(containerComposer, sql, "user_id", 
keyGenerateAlgorithm, consistencyCheckAlgorithmType, incrementalTaskFn);
         }
     }
     
+    @SneakyThrows
+    private void doCreateUpdateDelete(final PipelineContainerComposer 
containerComposer, final Object orderId) {
+        String updatedStatus = "updated" + System.currentTimeMillis();
+        insertOneOrder(containerComposer, orderId);
+        updateOneOrder(containerComposer, orderId, updatedStatus);
+        deleteOneOrder(containerComposer, orderId, updatedStatus);
+    }
+    
     private void insertOneOrder(final PipelineContainerComposer 
containerComposer, final Object uniqueKey) throws SQLException {
-        try (PreparedStatement preparedStatement = 
containerComposer.getSourceDataSource().getConnection().prepareStatement("INSERT
 INTO t_order (order_id,user_id,status) VALUES (?,?,?)")) {
+        try (
+                Connection connection = 
containerComposer.getSourceDataSource().getConnection();
+                PreparedStatement preparedStatement = 
connection.prepareStatement("INSERT INTO t_order (order_id,user_id,status) 
VALUES (?,?,?)")) {
             preparedStatement.setObject(1, uniqueKey);
             preparedStatement.setObject(2, 1);
             preparedStatement.setObject(3, "OK");
@@ -105,6 +122,33 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT 
{
         }
     }
     
+    private void updateOneOrder(final PipelineContainerComposer 
containerComposer, final Object uniqueKey, final String updatedStatus) throws 
SQLException {
+        try (
+                Connection connection = 
containerComposer.getSourceDataSource().getConnection();
+                PreparedStatement preparedStatement = connection
+                        .prepareStatement("UPDATE t_order SET status=? WHERE 
order_id = ? AND user_id = ? AND status = ?")) {
+            preparedStatement.setObject(1, updatedStatus);
+            preparedStatement.setObject(2, uniqueKey);
+            preparedStatement.setObject(3, 1);
+            preparedStatement.setObject(4, "OK");
+            int actualCount = preparedStatement.executeUpdate();
+            assertThat(actualCount, is(1));
+        }
+    }
+    
+    private void deleteOneOrder(final PipelineContainerComposer 
containerComposer, final Object uniqueKey, final String updatedStatus) throws 
SQLException {
+        try (
+                Connection connection = 
containerComposer.getSourceDataSource().getConnection();
+                PreparedStatement preparedStatement = connection
+                        .prepareStatement("DELETE FROM t_order WHERE order_id 
= ? AND user_id = ? AND status = ?")) {
+            preparedStatement.setObject(1, uniqueKey);
+            preparedStatement.setObject(2, 1);
+            preparedStatement.setObject(3, updatedStatus);
+            int actualCount = preparedStatement.executeUpdate();
+            assertThat(actualCount, is(1));
+        }
+    }
+    
     @ParameterizedTest(name = "{0}")
     @EnabledIf("isEnabled")
     @ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
@@ -122,6 +166,7 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
             Object uniqueKey = keyGenerateAlgorithm.generateKey();
             assertMigrationSuccess(containerComposer, sql, "user_id", 
keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
                 insertOneOrder(containerComposer, uniqueKey);
+                doCreateUpdateDelete(containerComposer, 
keyGenerateAlgorithm.generateKey());
                 containerComposer.assertProxyOrderRecordExist("t_order", 
uniqueKey);
                 return null;
             });
@@ -145,6 +190,7 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
             Object uniqueKey = keyGenerateAlgorithm.generateKey();
             assertMigrationSuccess(containerComposer, sql, "user_id", 
keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
                 insertOneOrder(containerComposer, uniqueKey);
+                doCreateUpdateDelete(containerComposer, 
keyGenerateAlgorithm.generateKey());
                 containerComposer.assertProxyOrderRecordExist("t_order", 
uniqueKey);
                 return null;
             });
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
index 19b9e2b09e9..0e211b4e7e6 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
@@ -133,6 +133,7 @@ public final class E2EIncrementalTask extends 
BaseIncrementTask {
     private void setNullToAllFields(final Object orderId) {
         if (databaseType instanceof MySQLDatabaseType) {
             String sql = 
SQLBuilderUtils.buildUpdateSQL(ignoreShardingColumns(MYSQL_COLUMN_NAMES), 
orderTableName, "null");
+            log.info("update sql: {}", sql);
             DataSourceExecuteUtils.execute(dataSource, sql, new 
Object[]{orderId});
         }
     }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java
index dc0140be0d5..1f190e36cbc 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java
@@ -122,7 +122,7 @@ class DataSourceImporterTest {
         verify(preparedStatement).setObject(1, 20);
         verify(preparedStatement).setObject(2, "UPDATE");
         verify(preparedStatement).setObject(3, 1);
-        verify(preparedStatement).setObject(4, 20);
+        verify(preparedStatement).setObject(4, 10);
         verify(preparedStatement).executeUpdate();
     }
     
@@ -137,7 +137,7 @@ class DataSourceImporterTest {
         inOrder.verify(preparedStatement).setObject(2, 10);
         inOrder.verify(preparedStatement).setObject(3, "UPDATE");
         inOrder.verify(preparedStatement).setObject(4, 1);
-        inOrder.verify(preparedStatement).setObject(5, 10);
+        inOrder.verify(preparedStatement).setObject(5, 0);
         inOrder.verify(preparedStatement).executeUpdate();
     }
     

Reply via email to