This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new bc11038fbce Compatible with BINARY/VARBINARY column type in MySQL
binlog parsing (#24426)
bc11038fbce is described below
commit bc11038fbce15aa56f5026c5e0b5c406599c9103
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Mar 3 12:41:56 2023 +0800
Compatible with BINARY/VARBINARY column type in MySQL binlog parsing
(#24426)
* Compatible with BINARY/VARBINARY column type in MySQL binlog parsing
* Update unit test
* Comment E2E
* Update IndexesMigrationE2EIT
---
...ogProtocolValue.java => MySQLBinaryString.java} | 24 +++++---
.../string/MySQLVarcharBinlogProtocolValue.java | 3 +-
.../MySQLVarcharBinlogProtocolValueTest.java | 31 +++++++---
.../data/pipeline/core/util/PipelineJdbcUtils.java | 18 ++++++
.../mysql/ingest/MySQLIncrementalDumper.java | 9 +++
.../netty/MySQLBinlogEventPacketDecoderTest.java | 9 +--
.../pipeline/cases/base/PipelineBaseE2EIT.java | 6 +-
.../primarykey/IndexesMigrationE2EIT.java | 71 ++++++++++++++++------
8 files changed, 125 insertions(+), 46 deletions(-)
diff --git
a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLVarcharBinlogProtocolValue.java
b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLBinaryString.java
similarity index 54%
copy from
db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLVarcharBinlogProtocolValue.java
copy to
db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLBinaryString.java
index c3224cda8dd..9cebd192797 100644
---
a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLVarcharBinlogProtocolValue.java
+++
b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLBinaryString.java
@@ -17,21 +17,25 @@
package
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.value.string;
-import
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.MySQLBinlogColumnDef;
-import
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.value.MySQLBinlogProtocolValue;
-import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
import java.io.Serializable;
/**
- * VARCHAR / VAR_STRING type value of MySQL binlog protocol.
+ * MySQL binary string.
+ *
+ * <p>Since MySQL replication protocol handles BINARY/VARBINARY column as
MYSQL_TYPE_VARCHAR(15),
+ * and MySQLBinlogProtocolValue.read parameters have no real column metadata.
+ * So this customized class object will be returned on parsing.</p>
*/
-public final class MySQLVarcharBinlogProtocolValue implements
MySQLBinlogProtocolValue {
+@RequiredArgsConstructor
+@EqualsAndHashCode
+@Getter
+public final class MySQLBinaryString implements Serializable {
- private static final int VARCHAR_LENGTH_META_POINT = 256;
+ private static final long serialVersionUID = 2448062591593788665L;
- @Override
- public Serializable read(final MySQLBinlogColumnDef columnDef, final
MySQLPacketPayload payload) {
- return payload.readStringFix(VARCHAR_LENGTH_META_POINT >
columnDef.getColumnMeta() ? payload.getByteBuf().readUnsignedByte() :
payload.getByteBuf().readUnsignedShortLE());
- }
+ private final byte[] bytes;
}
diff --git
a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLVarcharBinlogProtocolValue.java
b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLVarcharBinlogProtocolValue.java
index c3224cda8dd..2faeb3eac90 100644
---
a/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLVarcharBinlogProtocolValue.java
+++
b/db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLVarcharBinlogProtocolValue.java
@@ -32,6 +32,7 @@ public final class MySQLVarcharBinlogProtocolValue implements
MySQLBinlogProtoco
@Override
public Serializable read(final MySQLBinlogColumnDef columnDef, final
MySQLPacketPayload payload) {
- return payload.readStringFix(VARCHAR_LENGTH_META_POINT >
columnDef.getColumnMeta() ? payload.getByteBuf().readUnsignedByte() :
payload.getByteBuf().readUnsignedShortLE());
+ byte[] bytes = payload.readStringFixByBytes(VARCHAR_LENGTH_META_POINT
> columnDef.getColumnMeta() ? payload.getByteBuf().readUnsignedByte() :
payload.getByteBuf().readUnsignedShortLE());
+ return new MySQLBinaryString(bytes);
}
}
diff --git
a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLVarcharBinlogProtocolValueTest.java
b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLVarcharBinlogProtocolValueTest.java
index 7f429c01b04..e73dc681799 100644
---
a/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLVarcharBinlogProtocolValueTest.java
+++
b/db-protocol/mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLVarcharBinlogProtocolValueTest.java
@@ -27,6 +27,9 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
+import java.io.Serializable;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.when;
@@ -49,12 +52,18 @@ public final class MySQLVarcharBinlogProtocolValueTest {
@Test
public void assertReadVarcharValueWithMeta1() {
- String expected = "test_value";
- columnDef.setColumnMeta(10);
+ assertReadVarcharValueWithMeta("test_value".getBytes());
+ assertReadVarcharValueWithMeta(new byte[]{-1, 0, 1});
+ }
+
+ private void assertReadVarcharValueWithMeta(final byte[] expected) {
+ columnDef.setColumnMeta(expected.length);
when(payload.getByteBuf()).thenReturn(byteBuf);
- when(byteBuf.readUnsignedByte()).thenReturn((short) expected.length());
- when(payload.readStringFix(expected.length())).thenReturn(expected);
- assertThat(new MySQLVarcharBinlogProtocolValue().read(columnDef,
payload), is(expected));
+ when(byteBuf.readUnsignedByte()).thenReturn((short) expected.length);
+
when(payload.readStringFixByBytes(expected.length)).thenReturn(expected);
+ Serializable actual = new
MySQLVarcharBinlogProtocolValue().read(columnDef, payload);
+ assertThat(actual, instanceOf(MySQLBinaryString.class));
+ assertThat(((MySQLBinaryString) actual).getBytes(), is(expected));
}
@Test
@@ -63,11 +72,13 @@ public final class MySQLVarcharBinlogProtocolValueTest {
for (int i = 0; i < 256; i++) {
expectedStringBuilder.append(i);
}
- String expected = expectedStringBuilder.toString();
- columnDef.setColumnMeta(expected.length());
+ byte[] expected = expectedStringBuilder.toString().getBytes();
+ columnDef.setColumnMeta(expected.length);
when(payload.getByteBuf()).thenReturn(byteBuf);
- when(byteBuf.readUnsignedShortLE()).thenReturn(expected.length());
- when(payload.readStringFix(expected.length())).thenReturn(expected);
- assertThat(new MySQLVarcharBinlogProtocolValue().read(columnDef,
payload), is(expected));
+ when(byteBuf.readUnsignedShortLE()).thenReturn(expected.length);
+
when(payload.readStringFixByBytes(expected.length)).thenReturn(expected);
+ Serializable actual = new
MySQLVarcharBinlogProtocolValue().read(columnDef, payload);
+ assertThat(actual, instanceOf(MySQLBinaryString.class));
+ assertThat(((MySQLBinaryString) actual).getBytes(), is(expected));
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtils.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtils.java
index d1544c64d3a..d8a3fcef0af 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtils.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtils.java
@@ -57,4 +57,22 @@ public final class PipelineJdbcUtils {
return false;
}
}
+
+ /**
+ * Whether column is binary column.
+ * <p>it doesn't include BLOB etc.</p>
+ *
+ * @param columnType column type, value of java.sql.Types
+ * @return true or false
+ */
+ public static boolean isBinaryColumn(final int columnType) {
+ switch (columnType) {
+ case Types.BINARY:
+ case Types.VARBINARY:
+ case Types.LONGVARBINARY:
+ return true;
+ default:
+ return false;
+ }
+ }
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index b2a3a15a4f3..14e3dd709a4 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -36,6 +36,7 @@ import
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTable
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
import
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractRowsEvent;
@@ -46,11 +47,13 @@ import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.WriteRo
import org.apache.shardingsphere.data.pipeline.mysql.ingest.client.ConnectInfo;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.client.MySQLClient;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.MySQLDataTypeHandler;
+import
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.value.string.MySQLBinaryString;
import org.apache.shardingsphere.infra.database.metadata.DataSourceMetaData;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import java.io.Serializable;
+import java.nio.charset.Charset;
import java.security.SecureRandom;
import java.util.Objects;
import java.util.Optional;
@@ -176,6 +179,12 @@ public final class MySQLIncrementalDumper extends
AbstractLifecycleExecutor impl
}
private Serializable handleValue(final PipelineColumnMetaData
columnMetaData, final Serializable value) {
+ if (value instanceof MySQLBinaryString) {
+ if
(PipelineJdbcUtils.isBinaryColumn(columnMetaData.getDataType())) {
+ return ((MySQLBinaryString) value).getBytes();
+ }
+ return new String(((MySQLBinaryString) value).getBytes(),
Charset.defaultCharset());
+ }
Optional<MySQLDataTypeHandler> dataTypeHandler =
TypedSPILoader.findService(MySQLDataTypeHandler.class,
columnMetaData.getDataTypeName());
return dataTypeHandler.isPresent() ?
dataTypeHandler.get().handle(value) : value;
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
index 9664b695c4b..54e1666e7c2 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
@@ -31,6 +31,7 @@ import
org.apache.shardingsphere.db.protocol.constant.CommonConstants;
import
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinaryColumnType;
import
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.MySQLBinlogTableMapEventPacket;
import
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.MySQLBinlogColumnDef;
+import
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.value.string.MySQLBinaryString;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -129,7 +130,7 @@ public final class MySQLBinlogEventPacketDecoderTest {
assertThat(decodedEvents.size(), is(1));
assertThat(decodedEvents.get(0), instanceOf(WriteRowsEvent.class));
WriteRowsEvent actual = (WriteRowsEvent) decodedEvents.get(0);
- assertThat(actual.getAfterRows().get(0), is(new Serializable[]{1L, 1,
"SUCCESS", null}));
+ assertThat(actual.getAfterRows().get(0), is(new Serializable[]{1L, 1,
new MySQLBinaryString("SUCCESS".getBytes()), null}));
}
@Test
@@ -145,8 +146,8 @@ public final class MySQLBinlogEventPacketDecoderTest {
assertThat(decodedEvents.size(), is(1));
assertThat(decodedEvents.get(0), instanceOf(UpdateRowsEvent.class));
UpdateRowsEvent actual = (UpdateRowsEvent) decodedEvents.get(0);
- assertThat(actual.getBeforeRows().get(0), is(new Serializable[]{1L, 1,
"SUCCESS", null}));
- assertThat(actual.getAfterRows().get(0), is(new Serializable[]{1L, 1,
"updated", null}));
+ assertThat(actual.getBeforeRows().get(0), is(new Serializable[]{1L, 1,
new MySQLBinaryString("SUCCESS".getBytes()), null}));
+ assertThat(actual.getAfterRows().get(0), is(new Serializable[]{1L, 1,
new MySQLBinaryString("updated".getBytes()), null}));
}
@Test
@@ -161,7 +162,7 @@ public final class MySQLBinlogEventPacketDecoderTest {
assertThat(decodedEvents.size(), is(1));
assertThat(decodedEvents.get(0), instanceOf(DeleteRowsEvent.class));
DeleteRowsEvent actual = (DeleteRowsEvent) decodedEvents.get(0);
- assertThat(actual.getBeforeRows().get(0), is(new Serializable[]{1L, 1,
"SUCCESS", null}));
+ assertThat(actual.getBeforeRows().get(0), is(new Serializable[]{1L, 1,
new MySQLBinaryString("SUCCESS".getBytes()), null}));
}
@Test
diff --git
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
index 9e7df56b4f3..081e11f24b4 100644
---
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
+++
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
@@ -342,13 +342,17 @@ public abstract class PipelineBaseE2EIT {
}
protected void assertProxyOrderRecordExist(final String tableName, final
Object orderId) {
- boolean recordExist = false;
String sql;
if (orderId instanceof String) {
sql = String.format("SELECT 1 FROM %s WHERE order_id = '%s'",
tableName, orderId);
} else {
sql = String.format("SELECT 1 FROM %s WHERE order_id = %s",
tableName, orderId);
}
+ assertProxyOrderRecordExist(sql);
+ }
+
+ protected void assertProxyOrderRecordExist(final String sql) {
+ boolean recordExist = false;
for (int i = 0; i < 5; i++) {
List<Map<String, Object>> result = queryForListWithLog(sql);
recordExist = !result.isEmpty();
diff --git
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index aace00f413a..e33d2ed8ff7 100644
---
a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++
b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -18,6 +18,7 @@
package
org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.primarykey;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Hex;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import
org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
@@ -40,6 +41,7 @@ import java.sql.SQLException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.Callable;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@@ -89,7 +91,7 @@ public final class IndexesMigrationE2EIT extends
AbstractMigrationE2EIT {
}
@Test
- public void assertNoUniqueKeyMigrationSuccess() throws SQLException,
InterruptedException {
+ public void assertNoUniqueKeyMigrationSuccess() throws Exception {
String sql;
String consistencyCheckAlgorithmType;
if (getDatabaseType() instanceof MySQLDatabaseType) {
@@ -102,11 +104,27 @@ public final class IndexesMigrationE2EIT extends
AbstractMigrationE2EIT {
} else {
return;
}
- assertMigrationSuccess(sql, "user_id", new UUIDKeyGenerateAlgorithm(),
consistencyCheckAlgorithmType);
+ KeyGenerateAlgorithm keyGenerateAlgorithm = new
UUIDKeyGenerateAlgorithm();
+ Object uniqueKey = keyGenerateAlgorithm.generateKey();
+ assertMigrationSuccess(sql, "user_id", keyGenerateAlgorithm,
consistencyCheckAlgorithmType, () -> {
+ insertOneOrder(uniqueKey);
+ assertProxyOrderRecordExist("t_order", uniqueKey);
+ return null;
+ });
+ }
+
+ private void insertOneOrder(final Object uniqueKey) throws SQLException {
+ try (PreparedStatement preparedStatement =
getSourceDataSource().getConnection().prepareStatement("INSERT INTO t_order
(order_id,user_id,status) VALUES (?,?,?)")) {
+ preparedStatement.setObject(1, uniqueKey);
+ preparedStatement.setObject(2, 1);
+ preparedStatement.setObject(3, "OK");
+ int actualCount = preparedStatement.executeUpdate();
+ assertThat(actualCount, is(1));
+ }
}
@Test
- public void assertMultiPrimaryKeyMigrationSuccess() throws SQLException,
InterruptedException {
+ public void assertMultiPrimaryKeyMigrationSuccess() throws Exception {
String sql;
String consistencyCheckAlgorithmType;
if (getDatabaseType() instanceof MySQLDatabaseType) {
@@ -115,11 +133,17 @@ public final class IndexesMigrationE2EIT extends
AbstractMigrationE2EIT {
} else {
return;
}
- assertMigrationSuccess(sql, "user_id", new UUIDKeyGenerateAlgorithm(),
consistencyCheckAlgorithmType);
+ KeyGenerateAlgorithm keyGenerateAlgorithm = new
UUIDKeyGenerateAlgorithm();
+ Object uniqueKey = keyGenerateAlgorithm.generateKey();
+ assertMigrationSuccess(sql, "user_id", keyGenerateAlgorithm,
consistencyCheckAlgorithmType, () -> {
+ insertOneOrder(uniqueKey);
+ assertProxyOrderRecordExist("t_order", uniqueKey);
+ return null;
+ });
}
@Test
- public void assertMultiUniqueKeyMigrationSuccess() throws SQLException,
InterruptedException {
+ public void assertMultiUniqueKeyMigrationSuccess() throws Exception {
String sql;
String consistencyCheckAlgorithmType;
if (getDatabaseType() instanceof MySQLDatabaseType) {
@@ -128,11 +152,17 @@ public final class IndexesMigrationE2EIT extends
AbstractMigrationE2EIT {
} else {
return;
}
- assertMigrationSuccess(sql, "user_id", new
SnowflakeKeyGenerateAlgorithm(), consistencyCheckAlgorithmType);
+ KeyGenerateAlgorithm keyGenerateAlgorithm = new
SnowflakeKeyGenerateAlgorithm();
+ Object uniqueKey = keyGenerateAlgorithm.generateKey();
+ assertMigrationSuccess(sql, "user_id", keyGenerateAlgorithm,
consistencyCheckAlgorithmType, () -> {
+ insertOneOrder(uniqueKey);
+ assertProxyOrderRecordExist("t_order", uniqueKey);
+ return null;
+ });
}
@Test
- public void assertSpecialTypeSingleColumnUniqueKeyMigrationSuccess()
throws SQLException, InterruptedException {
+ public void assertSpecialTypeSingleColumnUniqueKeyMigrationSuccess()
throws Exception {
String sql;
String consistencyCheckAlgorithmType;
if (getDatabaseType() instanceof MySQLDatabaseType) {
@@ -142,15 +172,23 @@ public final class IndexesMigrationE2EIT extends
AbstractMigrationE2EIT {
} else {
return;
}
- assertMigrationSuccess(sql, "order_id", new
UUIDKeyGenerateAlgorithm(), consistencyCheckAlgorithmType);
+ KeyGenerateAlgorithm keyGenerateAlgorithm = new
UUIDKeyGenerateAlgorithm();
+ // TODO Insert binary string in VARBINARY column. But
KeyGenerateAlgorithm.generateKey() require returning Comparable, and byte[] is
not Comparable
+ byte[] uniqueKey = new byte[]{-1, 0, 1};
+ assertMigrationSuccess(sql, "order_id", keyGenerateAlgorithm,
consistencyCheckAlgorithmType, () -> {
+ insertOneOrder(uniqueKey);
+ // TODO Select by byte[] from proxy doesn't work, so unhex
function is used for now
+ assertProxyOrderRecordExist(String.format("SELECT 1 FROM t_order
WHERE order_id=UNHEX('%s')", Hex.encodeHexString(uniqueKey)));
+ return null;
+ });
}
- private void assertMigrationSuccess(final String sqlPattern, final String
shardingColumn, final KeyGenerateAlgorithm generateAlgorithm,
- final String
consistencyCheckAlgorithmType) throws SQLException, InterruptedException {
+ private void assertMigrationSuccess(final String sqlPattern, final String
shardingColumn, final KeyGenerateAlgorithm keyGenerateAlgorithm,
+ final String
consistencyCheckAlgorithmType, final Callable<Void> incrementalTaskFn) throws
Exception {
initEnvironment(getDatabaseType(), new MigrationJobType());
sourceExecuteWithLog(String.format(sqlPattern,
getSourceTableOrderName()));
try (Connection connection = getSourceDataSource().getConnection()) {
-
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection,
generateAlgorithm, getSourceTableOrderName(),
PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT);
+
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection,
keyGenerateAlgorithm, getSourceTableOrderName(), TABLE_INIT_ROW_COUNT);
}
addMigrationProcessConfig();
addMigrationSourceResource();
@@ -159,21 +197,14 @@ public final class IndexesMigrationE2EIT extends
AbstractMigrationE2EIT {
startMigration(getSourceTableOrderName(), getTargetTableOrderName());
String jobId = listJobId().get(0);
waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'",
jobId));
- Comparable<?> primaryKey = generateAlgorithm.generateKey();
- try (PreparedStatement preparedStatement =
getSourceDataSource().getConnection().prepareStatement("INSERT INTO t_order
(order_id,user_id,status) VALUES (?,?,?)")) {
- preparedStatement.setObject(1, primaryKey);
- preparedStatement.setObject(2, 1);
- preparedStatement.setObject(3, "OK");
- preparedStatement.execute();
- }
- assertProxyOrderRecordExist("t_order", primaryKey);
+ incrementalTaskFn.call();
waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'",
jobId));
if (null != consistencyCheckAlgorithmType) {
assertCheckMigrationSuccess(jobId, consistencyCheckAlgorithmType);
}
commitMigrationByJobId(jobId);
proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
- assertThat(getTargetTableRecordsCount(getSourceTableOrderName()),
is(PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT + 1));
+ assertThat(getTargetTableRecordsCount(getSourceTableOrderName()),
is(TABLE_INIT_ROW_COUNT + 1));
List<String> lastJobIds = listJobId();
assertTrue(lastJobIds.isEmpty());
}