This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 33a680659af Add MySQLBinlogDataTypeHandler (#32544)
33a680659af is described below
commit 33a680659af61364a8d506c18291b808a70fc9f7
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Aug 16 01:12:03 2024 +0800
Add MySQLBinlogDataTypeHandler (#32544)
* Add MySQLBinlogDataTypeHandler
* Add MySQLBinlogDataTypeHandler
* Add MySQLBinlogDataTypeHandler
---
.../ingest/dumper/MySQLIncrementalDumper.java | 24 +++-------
.../dumper/type/MySQLBinlogDataTypeHandler.java | 52 ++++++++++++++++++++++
2 files changed, 57 insertions(+), 19 deletions(-)
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/dumper/MySQLIncrementalDumper.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/dumper/MySQLIncrementalDumper.java
index 971ad1b0588..53a1fb597b1 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/dumper/MySQLIncrementalDumper.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/dumper/MySQLIncrementalDumper.java
@@ -32,7 +32,6 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
-import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.MySQLBinlogPosition;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractRowsEvent;
@@ -41,8 +40,7 @@ import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.UpdateR
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.WriteRowsEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.client.ConnectInfo;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.client.MySQLBinlogClient;
-import
org.apache.shardingsphere.data.pipeline.mysql.ingest.dumper.type.MySQLBinlogNumberDataTypeHandler;
-import
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.value.string.MySQLBinaryString;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.dumper.type.MySQLBinlogDataTypeHandler;
import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
import
org.apache.shardingsphere.infra.database.core.connector.ConnectionProperties;
import
org.apache.shardingsphere.infra.database.core.connector.ConnectionPropertiesParser;
@@ -52,12 +50,10 @@ import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveI
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import java.io.Serializable;
-import java.nio.charset.Charset;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
-import java.util.Optional;
/**
* MySQL incremental dumper.
@@ -155,7 +151,7 @@ public final class MySQLIncrementalDumper extends
AbstractPipelineLifecycleRunna
DataRecord dataRecord =
createDataRecord(PipelineSQLOperationType.INSERT, event, each.length);
for (int i = 0; i < each.length; i++) {
PipelineColumnMetaData columnMetaData =
tableMetaData.getColumnMetaData(i + 1);
- dataRecord.addColumn(new Column(columnMetaData.getName(),
handleValue(columnMetaData, each[i]), true, columnMetaData.isUniqueKey()));
+ dataRecord.addColumn(new Column(columnMetaData.getName(),
MySQLBinlogDataTypeHandler.handle(columnMetaData, each[i]), true,
columnMetaData.isUniqueKey()));
}
result.add(dataRecord);
}
@@ -170,8 +166,8 @@ public final class MySQLIncrementalDumper extends
AbstractPipelineLifecycleRunna
DataRecord dataRecord =
createDataRecord(PipelineSQLOperationType.UPDATE, event, beforeValues.length);
for (int j = 0; j < beforeValues.length; j++) {
PipelineColumnMetaData columnMetaData =
tableMetaData.getColumnMetaData(j + 1);
- Serializable oldValue = handleValue(columnMetaData,
beforeValues[j]);
- Serializable newValue = handleValue(columnMetaData,
afterValues[j]);
+ Serializable oldValue =
MySQLBinlogDataTypeHandler.handle(columnMetaData, beforeValues[j]);
+ Serializable newValue =
MySQLBinlogDataTypeHandler.handle(columnMetaData, afterValues[j]);
boolean updated = !Objects.deepEquals(newValue, oldValue);
dataRecord.addColumn(new Column(columnMetaData.getName(),
oldValue, newValue, updated, columnMetaData.isUniqueKey()));
}
@@ -186,23 +182,13 @@ public final class MySQLIncrementalDumper extends
AbstractPipelineLifecycleRunna
DataRecord dataRecord =
createDataRecord(PipelineSQLOperationType.DELETE, event, each.length);
for (int i = 0, length = each.length; i < length; i++) {
PipelineColumnMetaData columnMetaData =
tableMetaData.getColumnMetaData(i + 1);
- dataRecord.addColumn(new Column(columnMetaData.getName(),
handleValue(columnMetaData, each[i]), null, true,
columnMetaData.isUniqueKey()));
+ dataRecord.addColumn(new Column(columnMetaData.getName(),
MySQLBinlogDataTypeHandler.handle(columnMetaData, each[i]), null, true,
columnMetaData.isUniqueKey()));
}
result.add(dataRecord);
}
return result;
}
- private Serializable handleValue(final PipelineColumnMetaData
columnMetaData, final Serializable value) {
- if (value instanceof MySQLBinaryString) {
- return
PipelineJdbcUtils.isBinaryColumn(columnMetaData.getDataType())
- ? ((MySQLBinaryString) value).getBytes()
- : new String(((MySQLBinaryString) value).getBytes(),
Charset.defaultCharset());
- }
- Optional<MySQLBinlogNumberDataTypeHandler> dataTypeHandler =
TypedSPILoader.findService(MySQLBinlogNumberDataTypeHandler.class,
columnMetaData.getDataTypeName());
- return dataTypeHandler.isPresent() ?
dataTypeHandler.get().handle(value) : value;
- }
-
private DataRecord createDataRecord(final PipelineSQLOperationType type,
final AbstractRowsEvent rowsEvent, final int columnCount) {
String tableName =
dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).toString();
IngestPosition binlogPosition = new
MySQLBinlogPosition(rowsEvent.getFileName(), rowsEvent.getPosition());
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/dumper/type/MySQLBinlogDataTypeHandler.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/dumper/type/MySQLBinlogDataTypeHandler.java
new file mode 100644
index 00000000000..f320bf6ee92
--- /dev/null
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/dumper/type/MySQLBinlogDataTypeHandler.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.mysql.ingest.dumper.type;
+
+import lombok.NoArgsConstructor;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
+import
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.value.string.MySQLBinaryString;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.Optional;
+
+/**
+ * MySQL binlog data type handler.
+ */
+@NoArgsConstructor(access = lombok.AccessLevel.PRIVATE)
+public final class MySQLBinlogDataTypeHandler {
+
+ /**
+ * Handle column value.
+ *
+ * @param columnMetaData column meta data
+ * @param value column value
+ * @return handled column value
+ */
+ public static Serializable handle(final PipelineColumnMetaData
columnMetaData, final Serializable value) {
+ if (value instanceof MySQLBinaryString) {
+ return
PipelineJdbcUtils.isBinaryColumn(columnMetaData.getDataType())
+ ? ((MySQLBinaryString) value).getBytes()
+ : new String(((MySQLBinaryString) value).getBytes(),
Charset.defaultCharset());
+ }
+ Optional<MySQLBinlogNumberDataTypeHandler> dataTypeHandler =
TypedSPILoader.findService(MySQLBinlogNumberDataTypeHandler.class,
columnMetaData.getDataTypeName());
+ return dataTypeHandler.isPresent() ?
dataTypeHandler.get().handle(value) : value;
+ }
+}