This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new bc11038fbce Compatible with BINARY/VARBINARY column type in MySQL 
binlog parsing (#24426)
bc11038fbce is described below

commit bc11038fbce15aa56f5026c5e0b5c406599c9103
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Mar 3 12:41:56 2023 +0800

    Compatible with BINARY/VARBINARY column type in MySQL binlog parsing 
(#24426)
    
    * Compatible with BINARY/VARBINARY column type in MySQL binlog parsing
    
    * Update unit test
    
    * Comment E2E
    
    * Update IndexesMigrationE2EIT
---
 ...ogProtocolValue.java => MySQLBinaryString.java} | 24 +++++---
 .../string/MySQLVarcharBinlogProtocolValue.java    |  3 +-
 .../MySQLVarcharBinlogProtocolValueTest.java       | 31 +++++++---
 .../data/pipeline/core/util/PipelineJdbcUtils.java | 18 ++++++
 .../mysql/ingest/MySQLIncrementalDumper.java       |  9 +++
 .../netty/MySQLBinlogEventPacketDecoderTest.java   |  9 +--
 .../pipeline/cases/base/PipelineBaseE2EIT.java     |  6 +-
 .../primarykey/IndexesMigrationE2EIT.java          | 71 ++++++++++++++++------
 8 files changed, 125 insertions(+), 46 deletions(-)

diff --git 
a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLVarcharBinlogProtocolValue.java
 
b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLBinaryString.java
similarity index 54%
copy from 
db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLVarcharBinlogProtocolValue.java
copy to 
db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLBinaryString.java
index c3224cda8dd..9cebd192797 100644
--- 
a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLVarcharBinlogProtocolValue.java
+++ 
b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLBinaryString.java
@@ -17,21 +17,25 @@
 
 package 
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.value.string;
 
-import 
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.MySQLBinlogColumnDef;
-import 
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.value.MySQLBinlogProtocolValue;
-import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 
 import java.io.Serializable;
 
 /**
- * VARCHAR / VAR_STRING type value of MySQL binlog protocol.
+ * MySQL binary string.
+ *
+ * <p>Since MySQL replication protocol handles BINARY/VARBINARY column as 
MYSQL_TYPE_VARCHAR(15),
+ * and MySQLBinlogProtocolValue.read parameters have no real column metadata.
+ * So this customized class object will be returned on parsing.</p>
  */
-public final class MySQLVarcharBinlogProtocolValue implements 
MySQLBinlogProtocolValue {
+@RequiredArgsConstructor
+@EqualsAndHashCode
+@Getter
+public final class MySQLBinaryString implements Serializable {
     
-    private static final int VARCHAR_LENGTH_META_POINT = 256;
+    private static final long serialVersionUID = 2448062591593788665L;
     
-    @Override
-    public Serializable read(final MySQLBinlogColumnDef columnDef, final 
MySQLPacketPayload payload) {
-        return payload.readStringFix(VARCHAR_LENGTH_META_POINT > 
columnDef.getColumnMeta() ? payload.getByteBuf().readUnsignedByte() : 
payload.getByteBuf().readUnsignedShortLE());
-    }
+    private final byte[] bytes;
 }
diff --git 
a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLVarcharBinlogProtocolValue.java
 
b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLVarcharBinlogProtocolValue.java
index c3224cda8dd..2faeb3eac90 100644
--- 
a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLVarcharBinlogProtocolValue.java
+++ 
b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLVarcharBinlogProtocolValue.java
@@ -32,6 +32,7 @@ public final class MySQLVarcharBinlogProtocolValue implements 
MySQLBinlogProtoco
     
     @Override
     public Serializable read(final MySQLBinlogColumnDef columnDef, final 
MySQLPacketPayload payload) {
-        return payload.readStringFix(VARCHAR_LENGTH_META_POINT > 
columnDef.getColumnMeta() ? payload.getByteBuf().readUnsignedByte() : 
payload.getByteBuf().readUnsignedShortLE());
+        byte[] bytes = payload.readStringFixByBytes(VARCHAR_LENGTH_META_POINT 
> columnDef.getColumnMeta() ? payload.getByteBuf().readUnsignedByte() : 
payload.getByteBuf().readUnsignedShortLE());
+        return new MySQLBinaryString(bytes);
     }
 }
diff --git 
a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLVarcharBinlogProtocolValueTest.java
 
b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLVarcharBinlogProtocolValueTest.java
index 7f429c01b04..e73dc681799 100644
--- 
a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLVarcharBinlogProtocolValueTest.java
+++ 
b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLVarcharBinlogProtocolValueTest.java
@@ -27,6 +27,9 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
+import java.io.Serializable;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.Mockito.when;
@@ -49,12 +52,18 @@ public final class MySQLVarcharBinlogProtocolValueTest {
     
     @Test
     public void assertReadVarcharValueWithMeta1() {
-        String expected = "test_value";
-        columnDef.setColumnMeta(10);
+        assertReadVarcharValueWithMeta("test_value".getBytes());
+        assertReadVarcharValueWithMeta(new byte[]{-1, 0, 1});
+    }
+    
+    private void assertReadVarcharValueWithMeta(final byte[] expected) {
+        columnDef.setColumnMeta(expected.length);
         when(payload.getByteBuf()).thenReturn(byteBuf);
-        when(byteBuf.readUnsignedByte()).thenReturn((short) expected.length());
-        when(payload.readStringFix(expected.length())).thenReturn(expected);
-        assertThat(new MySQLVarcharBinlogProtocolValue().read(columnDef, 
payload), is(expected));
+        when(byteBuf.readUnsignedByte()).thenReturn((short) expected.length);
+        
when(payload.readStringFixByBytes(expected.length)).thenReturn(expected);
+        Serializable actual = new 
MySQLVarcharBinlogProtocolValue().read(columnDef, payload);
+        assertThat(actual, instanceOf(MySQLBinaryString.class));
+        assertThat(((MySQLBinaryString) actual).getBytes(), is(expected));
     }
     
     @Test
@@ -63,11 +72,13 @@ public final class MySQLVarcharBinlogProtocolValueTest {
         for (int i = 0; i < 256; i++) {
             expectedStringBuilder.append(i);
         }
-        String expected = expectedStringBuilder.toString();
-        columnDef.setColumnMeta(expected.length());
+        byte[] expected = expectedStringBuilder.toString().getBytes();
+        columnDef.setColumnMeta(expected.length);
         when(payload.getByteBuf()).thenReturn(byteBuf);
-        when(byteBuf.readUnsignedShortLE()).thenReturn(expected.length());
-        when(payload.readStringFix(expected.length())).thenReturn(expected);
-        assertThat(new MySQLVarcharBinlogProtocolValue().read(columnDef, 
payload), is(expected));
+        when(byteBuf.readUnsignedShortLE()).thenReturn(expected.length);
+        
when(payload.readStringFixByBytes(expected.length)).thenReturn(expected);
+        Serializable actual = new 
MySQLVarcharBinlogProtocolValue().read(columnDef, payload);
+        assertThat(actual, instanceOf(MySQLBinaryString.class));
+        assertThat(((MySQLBinaryString) actual).getBytes(), is(expected));
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtils.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtils.java
index d1544c64d3a..d8a3fcef0af 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtils.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtils.java
@@ -57,4 +57,22 @@ public final class PipelineJdbcUtils {
                 return false;
         }
     }
+    
+    /**
+     * Whether column is binary column.
+     * <p>it doesn't include BLOB etc.</p>
+     *
+     * @param columnType column type, value of java.sql.Types
+     * @return true or false
+     */
+    public static boolean isBinaryColumn(final int columnType) {
+        switch (columnType) {
+            case Types.BINARY:
+            case Types.VARBINARY:
+            case Types.LONGVARBINARY:
+                return true;
+            default:
+                return false;
+        }
+    }
 }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index b2a3a15a4f3..14e3dd709a4 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -36,6 +36,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTable
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractRowsEvent;
@@ -46,11 +47,13 @@ import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.WriteRo
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.client.ConnectInfo;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.client.MySQLClient;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.MySQLDataTypeHandler;
+import 
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.value.string.MySQLBinaryString;
 import org.apache.shardingsphere.infra.database.metadata.DataSourceMetaData;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 
 import java.io.Serializable;
+import java.nio.charset.Charset;
 import java.security.SecureRandom;
 import java.util.Objects;
 import java.util.Optional;
@@ -176,6 +179,12 @@ public final class MySQLIncrementalDumper extends 
AbstractLifecycleExecutor impl
     }
     
     private Serializable handleValue(final PipelineColumnMetaData 
columnMetaData, final Serializable value) {
+        if (value instanceof MySQLBinaryString) {
+            if 
(PipelineJdbcUtils.isBinaryColumn(columnMetaData.getDataType())) {
+                return ((MySQLBinaryString) value).getBytes();
+            }
+            return new String(((MySQLBinaryString) value).getBytes(), 
Charset.defaultCharset());
+        }
         Optional<MySQLDataTypeHandler> dataTypeHandler = 
TypedSPILoader.findService(MySQLDataTypeHandler.class, 
columnMetaData.getDataTypeName());
         return dataTypeHandler.isPresent() ? 
dataTypeHandler.get().handle(value) : value;
     }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
index 9664b695c4b..54e1666e7c2 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
@@ -31,6 +31,7 @@ import 
org.apache.shardingsphere.db.protocol.constant.CommonConstants;
 import 
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinaryColumnType;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.MySQLBinlogTableMapEventPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.MySQLBinlogColumnDef;
+import 
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.value.string.MySQLBinaryString;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -129,7 +130,7 @@ public final class MySQLBinlogEventPacketDecoderTest {
         assertThat(decodedEvents.size(), is(1));
         assertThat(decodedEvents.get(0), instanceOf(WriteRowsEvent.class));
         WriteRowsEvent actual = (WriteRowsEvent) decodedEvents.get(0);
-        assertThat(actual.getAfterRows().get(0), is(new Serializable[]{1L, 1, 
"SUCCESS", null}));
+        assertThat(actual.getAfterRows().get(0), is(new Serializable[]{1L, 1, 
new MySQLBinaryString("SUCCESS".getBytes()), null}));
     }
     
     @Test
@@ -145,8 +146,8 @@ public final class MySQLBinlogEventPacketDecoderTest {
         assertThat(decodedEvents.size(), is(1));
         assertThat(decodedEvents.get(0), instanceOf(UpdateRowsEvent.class));
         UpdateRowsEvent actual = (UpdateRowsEvent) decodedEvents.get(0);
-        assertThat(actual.getBeforeRows().get(0), is(new Serializable[]{1L, 1, 
"SUCCESS", null}));
-        assertThat(actual.getAfterRows().get(0), is(new Serializable[]{1L, 1, 
"updated", null}));
+        assertThat(actual.getBeforeRows().get(0), is(new Serializable[]{1L, 1, 
new MySQLBinaryString("SUCCESS".getBytes()), null}));
+        assertThat(actual.getAfterRows().get(0), is(new Serializable[]{1L, 1, 
new MySQLBinaryString("updated".getBytes()), null}));
     }
     
     @Test
@@ -161,7 +162,7 @@ public final class MySQLBinlogEventPacketDecoderTest {
         assertThat(decodedEvents.size(), is(1));
         assertThat(decodedEvents.get(0), instanceOf(DeleteRowsEvent.class));
         DeleteRowsEvent actual = (DeleteRowsEvent) decodedEvents.get(0);
-        assertThat(actual.getBeforeRows().get(0), is(new Serializable[]{1L, 1, 
"SUCCESS", null}));
+        assertThat(actual.getBeforeRows().get(0), is(new Serializable[]{1L, 1, 
new MySQLBinaryString("SUCCESS".getBytes()), null}));
     }
     
     @Test
diff --git 
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
 
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
index 9e7df56b4f3..081e11f24b4 100644
--- 
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
+++ 
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
@@ -342,13 +342,17 @@ public abstract class PipelineBaseE2EIT {
     }
     
     protected void assertProxyOrderRecordExist(final String tableName, final 
Object orderId) {
-        boolean recordExist = false;
         String sql;
         if (orderId instanceof String) {
             sql = String.format("SELECT 1 FROM %s WHERE order_id = '%s'", 
tableName, orderId);
         } else {
             sql = String.format("SELECT 1 FROM %s WHERE order_id = %s", 
tableName, orderId);
         }
+        assertProxyOrderRecordExist(sql);
+    }
+    
+    protected void assertProxyOrderRecordExist(final String sql) {
+        boolean recordExist = false;
         for (int i = 0; i < 5; i++) {
             List<Map<String, Object>> result = queryForListWithLog(sql);
             recordExist = !result.isEmpty();
diff --git 
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
 
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index aace00f413a..e33d2ed8ff7 100644
--- 
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++ 
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -18,6 +18,7 @@
 package 
org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.primarykey;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Hex;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import 
org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
@@ -40,6 +41,7 @@ import java.sql.SQLException;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.Callable;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
@@ -89,7 +91,7 @@ public final class IndexesMigrationE2EIT extends 
AbstractMigrationE2EIT {
     }
     
     @Test
-    public void assertNoUniqueKeyMigrationSuccess() throws SQLException, 
InterruptedException {
+    public void assertNoUniqueKeyMigrationSuccess() throws Exception {
         String sql;
         String consistencyCheckAlgorithmType;
         if (getDatabaseType() instanceof MySQLDatabaseType) {
@@ -102,11 +104,27 @@ public final class IndexesMigrationE2EIT extends 
AbstractMigrationE2EIT {
         } else {
             return;
         }
-        assertMigrationSuccess(sql, "user_id", new UUIDKeyGenerateAlgorithm(), 
consistencyCheckAlgorithmType);
+        KeyGenerateAlgorithm keyGenerateAlgorithm = new 
UUIDKeyGenerateAlgorithm();
+        Object uniqueKey = keyGenerateAlgorithm.generateKey();
+        assertMigrationSuccess(sql, "user_id", keyGenerateAlgorithm, 
consistencyCheckAlgorithmType, () -> {
+            insertOneOrder(uniqueKey);
+            assertProxyOrderRecordExist("t_order", uniqueKey);
+            return null;
+        });
+    }
+    
+    private void insertOneOrder(final Object uniqueKey) throws SQLException {
+        try (PreparedStatement preparedStatement = 
getSourceDataSource().getConnection().prepareStatement("INSERT INTO t_order 
(order_id,user_id,status) VALUES (?,?,?)")) {
+            preparedStatement.setObject(1, uniqueKey);
+            preparedStatement.setObject(2, 1);
+            preparedStatement.setObject(3, "OK");
+            int actualCount = preparedStatement.executeUpdate();
+            assertThat(actualCount, is(1));
+        }
     }
     
     @Test
-    public void assertMultiPrimaryKeyMigrationSuccess() throws SQLException, 
InterruptedException {
+    public void assertMultiPrimaryKeyMigrationSuccess() throws Exception {
         String sql;
         String consistencyCheckAlgorithmType;
         if (getDatabaseType() instanceof MySQLDatabaseType) {
@@ -115,11 +133,17 @@ public final class IndexesMigrationE2EIT extends 
AbstractMigrationE2EIT {
         } else {
             return;
         }
-        assertMigrationSuccess(sql, "user_id", new UUIDKeyGenerateAlgorithm(), 
consistencyCheckAlgorithmType);
+        KeyGenerateAlgorithm keyGenerateAlgorithm = new 
UUIDKeyGenerateAlgorithm();
+        Object uniqueKey = keyGenerateAlgorithm.generateKey();
+        assertMigrationSuccess(sql, "user_id", keyGenerateAlgorithm, 
consistencyCheckAlgorithmType, () -> {
+            insertOneOrder(uniqueKey);
+            assertProxyOrderRecordExist("t_order", uniqueKey);
+            return null;
+        });
     }
     
     @Test
-    public void assertMultiUniqueKeyMigrationSuccess() throws SQLException, 
InterruptedException {
+    public void assertMultiUniqueKeyMigrationSuccess() throws Exception {
         String sql;
         String consistencyCheckAlgorithmType;
         if (getDatabaseType() instanceof MySQLDatabaseType) {
@@ -128,11 +152,17 @@ public final class IndexesMigrationE2EIT extends 
AbstractMigrationE2EIT {
         } else {
             return;
         }
-        assertMigrationSuccess(sql, "user_id", new 
SnowflakeKeyGenerateAlgorithm(), consistencyCheckAlgorithmType);
+        KeyGenerateAlgorithm keyGenerateAlgorithm = new 
SnowflakeKeyGenerateAlgorithm();
+        Object uniqueKey = keyGenerateAlgorithm.generateKey();
+        assertMigrationSuccess(sql, "user_id", keyGenerateAlgorithm, 
consistencyCheckAlgorithmType, () -> {
+            insertOneOrder(uniqueKey);
+            assertProxyOrderRecordExist("t_order", uniqueKey);
+            return null;
+        });
     }
     
     @Test
-    public void assertSpecialTypeSingleColumnUniqueKeyMigrationSuccess() 
throws SQLException, InterruptedException {
+    public void assertSpecialTypeSingleColumnUniqueKeyMigrationSuccess() 
throws Exception {
         String sql;
         String consistencyCheckAlgorithmType;
         if (getDatabaseType() instanceof MySQLDatabaseType) {
@@ -142,15 +172,23 @@ public final class IndexesMigrationE2EIT extends 
AbstractMigrationE2EIT {
         } else {
             return;
         }
-        assertMigrationSuccess(sql, "order_id", new 
UUIDKeyGenerateAlgorithm(), consistencyCheckAlgorithmType);
+        KeyGenerateAlgorithm keyGenerateAlgorithm = new 
UUIDKeyGenerateAlgorithm();
+        // TODO Insert binary string in VARBINARY column. But 
KeyGenerateAlgorithm.generateKey() require returning Comparable, and byte[] is 
not Comparable
+        byte[] uniqueKey = new byte[]{-1, 0, 1};
+        assertMigrationSuccess(sql, "order_id", keyGenerateAlgorithm, 
consistencyCheckAlgorithmType, () -> {
+            insertOneOrder(uniqueKey);
+            // TODO Select by byte[] from proxy doesn't work, so unhex 
function is used for now
+            assertProxyOrderRecordExist(String.format("SELECT 1 FROM t_order 
WHERE order_id=UNHEX('%s')", Hex.encodeHexString(uniqueKey)));
+            return null;
+        });
     }
     
-    private void assertMigrationSuccess(final String sqlPattern, final String 
shardingColumn, final KeyGenerateAlgorithm generateAlgorithm,
-                                        final String 
consistencyCheckAlgorithmType) throws SQLException, InterruptedException {
+    private void assertMigrationSuccess(final String sqlPattern, final String 
shardingColumn, final KeyGenerateAlgorithm keyGenerateAlgorithm,
+                                        final String 
consistencyCheckAlgorithmType, final Callable<Void> incrementalTaskFn) throws 
Exception {
         initEnvironment(getDatabaseType(), new MigrationJobType());
         sourceExecuteWithLog(String.format(sqlPattern, 
getSourceTableOrderName()));
         try (Connection connection = getSourceDataSource().getConnection()) {
-            
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, 
generateAlgorithm, getSourceTableOrderName(), 
PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT);
+            
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, 
keyGenerateAlgorithm, getSourceTableOrderName(), TABLE_INIT_ROW_COUNT);
         }
         addMigrationProcessConfig();
         addMigrationSourceResource();
@@ -159,21 +197,14 @@ public final class IndexesMigrationE2EIT extends 
AbstractMigrationE2EIT {
         startMigration(getSourceTableOrderName(), getTargetTableOrderName());
         String jobId = listJobId().get(0);
         waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", 
jobId));
-        Comparable<?> primaryKey = generateAlgorithm.generateKey();
-        try (PreparedStatement preparedStatement = 
getSourceDataSource().getConnection().prepareStatement("INSERT INTO t_order 
(order_id,user_id,status) VALUES (?,?,?)")) {
-            preparedStatement.setObject(1, primaryKey);
-            preparedStatement.setObject(2, 1);
-            preparedStatement.setObject(3, "OK");
-            preparedStatement.execute();
-        }
-        assertProxyOrderRecordExist("t_order", primaryKey);
+        incrementalTaskFn.call();
         waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", 
jobId));
         if (null != consistencyCheckAlgorithmType) {
             assertCheckMigrationSuccess(jobId, consistencyCheckAlgorithmType);
         }
         commitMigrationByJobId(jobId);
         proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
-        assertThat(getTargetTableRecordsCount(getSourceTableOrderName()), 
is(PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT + 1));
+        assertThat(getTargetTableRecordsCount(getSourceTableOrderName()), 
is(TABLE_INIT_ROW_COUNT + 1));
         List<String> lastJobIds = listJobId();
         assertTrue(lastJobIds.isEmpty());
     }

Reply via email to