This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new b54e7c45b72 Remove PipelineJdbcUtils and use DialectDataTypeOption
instead (#35309)
b54e7c45b72 is described below
commit b54e7c45b726e68193cda64c00743ab849b6b683
Author: Liang Zhang <[email protected]>
AuthorDate: Fri May 2 23:54:13 2025 +0800
Remove PipelineJdbcUtils and use DialectDataTypeOption instead (#35309)
- Remove integer, string, and binary column type checks from
PipelineJdbcUtils
- Add corresponding methods to DialectDataTypeOption interface
- Implement these methods in DefaultDataTypeOption and database-specific
classes
- Update InventoryDumperContextSplitter and MySQLBinlogBinaryStringHandler
to use new methods- Remove redundant tests from PipelineJdbcUtilsTest
---
.../option/datatype/DefaultDataTypeOption.java | 41 +++++++++++++++
.../option/datatype/DialectDataTypeOption.java | 25 ++++++++++
.../option/datatype/DefaultDataTypeOptionTest.java | 58 ++++++++++++++++++++++
.../database/option/MySQLDataTypeOption.java | 18 +++++++
.../database/option/OpenGaussDataTypeOption.java | 18 +++++++
.../database/option/OracleDataTypeOption.java | 18 +++++++
.../database/option/PostgreSQLDataTypeOption.java | 18 +++++++
.../splitter/InventoryDumperContextSplitter.java | 8 +--
.../pipeline/core/util/PipelineJdbcUtilsTest.java | 31 ------------
.../binary/MySQLBinlogBinaryStringHandler.java | 8 ++-
10 files changed, 207 insertions(+), 36 deletions(-)
diff --git
a/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/metadata/database/metadata/option/datatype/DefaultDataTypeOption.java
b/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/metadata/database/metadata/option/datatype/DefaultDataTypeOption.java
index 910485dabe2..b39152fcb47 100644
---
a/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/metadata/database/metadata/option/datatype/DefaultDataTypeOption.java
+++
b/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/metadata/database/metadata/option/datatype/DefaultDataTypeOption.java
@@ -17,6 +17,7 @@
package
org.apache.shardingsphere.infra.database.core.metadata.database.metadata.option.datatype;
+import java.sql.Types;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
@@ -35,4 +36,44 @@ public final class DefaultDataTypeOption implements
DialectDataTypeOption {
public Optional<Class<?>> findExtraSQLTypeClass(final int dataType, final
boolean unsigned) {
return Optional.empty();
}
+
+ @Override
+ public boolean isIntegerDataType(final int sqlType) {
+ switch (sqlType) {
+ case Types.INTEGER:
+ case Types.BIGINT:
+ case Types.SMALLINT:
+ case Types.TINYINT:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ @Override
+ public boolean isStringDataType(final int sqlType) {
+ switch (sqlType) {
+ case Types.CHAR:
+ case Types.VARCHAR:
+ case Types.LONGVARCHAR:
+ case Types.NCHAR:
+ case Types.NVARCHAR:
+ case Types.LONGNVARCHAR:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ @Override
+ public boolean isBinaryDataType(final int sqlType) {
+ switch (sqlType) {
+ case Types.BINARY:
+ case Types.VARBINARY:
+ case Types.LONGVARBINARY:
+ return true;
+ default:
+ return false;
+ }
+ }
}
diff --git
a/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/metadata/database/metadata/option/datatype/DialectDataTypeOption.java
b/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/metadata/database/metadata/option/datatype/DialectDataTypeOption.java
index 877f8906b96..a7eeff0b3fe 100644
---
a/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/metadata/database/metadata/option/datatype/DialectDataTypeOption.java
+++
b/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/metadata/database/metadata/option/datatype/DialectDataTypeOption.java
@@ -40,4 +40,29 @@ public interface DialectDataTypeOption {
* @return extra SQL type class
*/
Optional<Class<?>> findExtraSQLTypeClass(int dataType, boolean unsigned);
+
+ /**
+ * Whether data type is integer type.
+ *
+ * @param sqlType value of java.sql.Types
+ * @return is integer type or not
+ */
+ boolean isIntegerDataType(int sqlType);
+
+ /**
+ * Whether data type is string column.
+ *
+ * @param sqlType value of java.sql.Types
+ * @return is string type or not
+ */
+ boolean isStringDataType(int sqlType);
+
+ /**
+ * Whether data type is binary type.
+ * <p>it doesn't include BLOB etc.</p>
+ *
+ * @param sqlType value of java.sql.Types
+ * @return is binary type or not
+ */
+ boolean isBinaryDataType(int sqlType);
}
diff --git
a/infra/database/core/src/test/java/org/apache/shardingsphere/infra/database/core/metadata/database/metadata/option/datatype/DefaultDataTypeOptionTest.java
b/infra/database/core/src/test/java/org/apache/shardingsphere/infra/database/core/metadata/database/metadata/option/datatype/DefaultDataTypeOptionTest.java
new file mode 100644
index 00000000000..99395ebff7b
--- /dev/null
+++
b/infra/database/core/src/test/java/org/apache/shardingsphere/infra/database/core/metadata/database/metadata/option/datatype/DefaultDataTypeOptionTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.infra.database.core.metadata.database.metadata.option.datatype;
+
+import org.junit.jupiter.api.Test;
+
+import java.sql.Types;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class DefaultDataTypeOptionTest {
+
+ private final DialectDataTypeOption dataTypeOption = new
DefaultDataTypeOption();
+
+ @Test
+ void assertIsIntegerDataType() {
+ assertTrue(dataTypeOption.isIntegerDataType(Types.INTEGER));
+ assertTrue(dataTypeOption.isIntegerDataType(Types.BIGINT));
+ assertTrue(dataTypeOption.isIntegerDataType(Types.SMALLINT));
+ assertTrue(dataTypeOption.isIntegerDataType(Types.TINYINT));
+ assertFalse(dataTypeOption.isIntegerDataType(Types.VARCHAR));
+ }
+
+ @Test
+ void assertIsStringDataType() {
+ assertTrue(dataTypeOption.isStringDataType(Types.CHAR));
+ assertTrue(dataTypeOption.isStringDataType(Types.VARCHAR));
+ assertTrue(dataTypeOption.isStringDataType(Types.LONGVARCHAR));
+ assertTrue(dataTypeOption.isStringDataType(Types.NCHAR));
+ assertTrue(dataTypeOption.isStringDataType(Types.NVARCHAR));
+ assertTrue(dataTypeOption.isStringDataType(Types.LONGNVARCHAR));
+ assertFalse(dataTypeOption.isStringDataType(Types.INTEGER));
+ }
+
+ @Test
+ void assertIsBinaryDataType() {
+ assertTrue(dataTypeOption.isBinaryDataType(Types.BINARY));
+ assertTrue(dataTypeOption.isBinaryDataType(Types.VARBINARY));
+ assertTrue(dataTypeOption.isBinaryDataType(Types.LONGVARBINARY));
+ assertFalse(dataTypeOption.isBinaryDataType(Types.VARCHAR));
+ }
+}
diff --git
a/infra/database/type/mysql/src/main/java/org/apache/shardingsphere/infra/database/mysql/metadata/database/option/MySQLDataTypeOption.java
b/infra/database/type/mysql/src/main/java/org/apache/shardingsphere/infra/database/mysql/metadata/database/option/MySQLDataTypeOption.java
index 71e7480c885..fe3b069b93e 100644
---
a/infra/database/type/mysql/src/main/java/org/apache/shardingsphere/infra/database/mysql/metadata/database/option/MySQLDataTypeOption.java
+++
b/infra/database/type/mysql/src/main/java/org/apache/shardingsphere/infra/database/mysql/metadata/database/option/MySQLDataTypeOption.java
@@ -17,6 +17,7 @@
package
org.apache.shardingsphere.infra.database.mysql.metadata.database.option;
+import
org.apache.shardingsphere.infra.database.core.metadata.database.metadata.option.datatype.DefaultDataTypeOption;
import
org.apache.shardingsphere.infra.database.core.metadata.database.metadata.option.datatype.DialectDataTypeOption;
import java.math.BigInteger;
@@ -30,6 +31,8 @@ import java.util.Optional;
*/
public final class MySQLDataTypeOption implements DialectDataTypeOption {
+ private final DialectDataTypeOption dataTypeOption = new
DefaultDataTypeOption();
+
@Override
public Map<String, Integer> getExtraDataTypes() {
Map<String, Integer> result = new HashMap<>(10, 1F);
@@ -59,4 +62,19 @@ public final class MySQLDataTypeOption implements
DialectDataTypeOption {
}
return Optional.empty();
}
+
+ @Override
+ public boolean isIntegerDataType(final int sqlType) {
+ return dataTypeOption.isIntegerDataType(sqlType);
+ }
+
+ @Override
+ public boolean isStringDataType(final int sqlType) {
+ return dataTypeOption.isStringDataType(sqlType);
+ }
+
+ @Override
+ public boolean isBinaryDataType(final int sqlType) {
+ return dataTypeOption.isBinaryDataType(sqlType);
+ }
}
diff --git
a/infra/database/type/opengauss/src/main/java/org/apache/shardingsphere/infra/database/opengauss/metadata/database/option/OpenGaussDataTypeOption.java
b/infra/database/type/opengauss/src/main/java/org/apache/shardingsphere/infra/database/opengauss/metadata/database/option/OpenGaussDataTypeOption.java
index 19a843bc5ad..63d1c17ef24 100644
---
a/infra/database/type/opengauss/src/main/java/org/apache/shardingsphere/infra/database/opengauss/metadata/database/option/OpenGaussDataTypeOption.java
+++
b/infra/database/type/opengauss/src/main/java/org/apache/shardingsphere/infra/database/opengauss/metadata/database/option/OpenGaussDataTypeOption.java
@@ -18,6 +18,7 @@
package
org.apache.shardingsphere.infra.database.opengauss.metadata.database.option;
import com.cedarsoftware.util.CaseInsensitiveMap;
+import
org.apache.shardingsphere.infra.database.core.metadata.database.metadata.option.datatype.DefaultDataTypeOption;
import
org.apache.shardingsphere.infra.database.core.metadata.database.metadata.option.datatype.DialectDataTypeOption;
import java.sql.Types;
@@ -29,6 +30,8 @@ import java.util.Optional;
*/
public final class OpenGaussDataTypeOption implements DialectDataTypeOption {
+ private final DialectDataTypeOption dataTypeOption = new
DefaultDataTypeOption();
+
@Override
public Map<String, Integer> getExtraDataTypes() {
Map<String, Integer> result = new CaseInsensitiveMap<>();
@@ -48,4 +51,19 @@ public final class OpenGaussDataTypeOption implements
DialectDataTypeOption {
public Optional<Class<?>> findExtraSQLTypeClass(final int dataType, final
boolean unsigned) {
return Optional.empty();
}
+
+ @Override
+ public boolean isIntegerDataType(final int sqlType) {
+ return dataTypeOption.isIntegerDataType(sqlType);
+ }
+
+ @Override
+ public boolean isStringDataType(final int sqlType) {
+ return dataTypeOption.isStringDataType(sqlType);
+ }
+
+ @Override
+ public boolean isBinaryDataType(final int sqlType) {
+ return dataTypeOption.isBinaryDataType(sqlType);
+ }
}
diff --git
a/infra/database/type/oracle/src/main/java/org/apache/shardingsphere/infra/database/oracle/metadata/database/option/OracleDataTypeOption.java
b/infra/database/type/oracle/src/main/java/org/apache/shardingsphere/infra/database/oracle/metadata/database/option/OracleDataTypeOption.java
index b6c8d11c0ed..b2cca7d3f6d 100644
---
a/infra/database/type/oracle/src/main/java/org/apache/shardingsphere/infra/database/oracle/metadata/database/option/OracleDataTypeOption.java
+++
b/infra/database/type/oracle/src/main/java/org/apache/shardingsphere/infra/database/oracle/metadata/database/option/OracleDataTypeOption.java
@@ -17,6 +17,7 @@
package
org.apache.shardingsphere.infra.database.oracle.metadata.database.option;
+import
org.apache.shardingsphere.infra.database.core.metadata.database.metadata.option.datatype.DefaultDataTypeOption;
import
org.apache.shardingsphere.infra.database.core.metadata.database.metadata.option.datatype.DialectDataTypeOption;
import java.sql.Types;
@@ -29,6 +30,8 @@ import java.util.Optional;
*/
public final class OracleDataTypeOption implements DialectDataTypeOption {
+ private final DialectDataTypeOption dataTypeOption = new
DefaultDataTypeOption();
+
@Override
public Map<String, Integer> getExtraDataTypes() {
Map<String, Integer> result = new HashMap<>(8);
@@ -50,4 +53,19 @@ public final class OracleDataTypeOption implements
DialectDataTypeOption {
public Optional<Class<?>> findExtraSQLTypeClass(final int dataType, final
boolean unsigned) {
return Optional.empty();
}
+
+ @Override
+ public boolean isIntegerDataType(final int sqlType) {
+ return dataTypeOption.isIntegerDataType(sqlType);
+ }
+
+ @Override
+ public boolean isStringDataType(final int sqlType) {
+ return dataTypeOption.isStringDataType(sqlType);
+ }
+
+ @Override
+ public boolean isBinaryDataType(final int sqlType) {
+ return dataTypeOption.isBinaryDataType(sqlType);
+ }
}
diff --git
a/infra/database/type/postgresql/src/main/java/org/apache/shardingsphere/infra/database/postgresql/metadata/database/option/PostgreSQLDataTypeOption.java
b/infra/database/type/postgresql/src/main/java/org/apache/shardingsphere/infra/database/postgresql/metadata/database/option/PostgreSQLDataTypeOption.java
index 78bb335e553..95c6657c988 100644
---
a/infra/database/type/postgresql/src/main/java/org/apache/shardingsphere/infra/database/postgresql/metadata/database/option/PostgreSQLDataTypeOption.java
+++
b/infra/database/type/postgresql/src/main/java/org/apache/shardingsphere/infra/database/postgresql/metadata/database/option/PostgreSQLDataTypeOption.java
@@ -18,6 +18,7 @@
package
org.apache.shardingsphere.infra.database.postgresql.metadata.database.option;
import com.cedarsoftware.util.CaseInsensitiveMap;
+import
org.apache.shardingsphere.infra.database.core.metadata.database.metadata.option.datatype.DefaultDataTypeOption;
import
org.apache.shardingsphere.infra.database.core.metadata.database.metadata.option.datatype.DialectDataTypeOption;
import java.sql.Types;
@@ -29,6 +30,8 @@ import java.util.Optional;
*/
public final class PostgreSQLDataTypeOption implements DialectDataTypeOption {
+ private final DialectDataTypeOption dataTypeOption = new
DefaultDataTypeOption();
+
@Override
public Map<String, Integer> getExtraDataTypes() {
Map<String, Integer> result = new CaseInsensitiveMap<>();
@@ -51,4 +54,19 @@ public final class PostgreSQLDataTypeOption implements
DialectDataTypeOption {
}
return Optional.empty();
}
+
+ @Override
+ public boolean isIntegerDataType(final int sqlType) {
+ return dataTypeOption.isIntegerDataType(sqlType);
+ }
+
+ @Override
+ public boolean isStringDataType(final int sqlType) {
+ return dataTypeOption.isStringDataType(sqlType);
+ }
+
+ @Override
+ public boolean isBinaryDataType(final int sqlType) {
+ return dataTypeOption.isBinaryDataType(sqlType);
+ }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
index 5fa5458177e..3ff25129c73 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
@@ -35,7 +35,8 @@ import
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculato
import
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.InventoryRecordsCountCalculator;
import
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
-import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
+import
org.apache.shardingsphere.infra.database.core.metadata.database.metadata.option.datatype.DialectDataTypeOption;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
import java.sql.Connection;
@@ -119,13 +120,14 @@ public final class InventoryDumperContextSplitter {
}
List<PipelineColumnMetaData> uniqueKeyColumns =
dumperContext.getUniqueKeyColumns();
if (1 == uniqueKeyColumns.size()) {
+ DialectDataTypeOption dataTypeOption = new
DatabaseTypeRegistry(sourceDataSource.getDatabaseType()).getDialectDatabaseMetaData().getDataTypeOption();
int firstColumnDataType = uniqueKeyColumns.get(0).getDataType();
- if (PipelineJdbcUtils.isIntegerColumn(firstColumnDataType)) {
+ if (dataTypeOption.isIntegerDataType(firstColumnDataType)) {
Range<Long> uniqueKeyValuesRange =
getUniqueKeyValuesRange(jobItemContext, dumperContext);
int shardingSize =
jobItemContext.getJobProcessContext().getProcessConfiguration().getRead().getShardingSize();
return
InventoryPositionCalculator.getPositionByIntegerUniqueKeyRange(tableRecordsCount,
uniqueKeyValuesRange, shardingSize);
}
- if (PipelineJdbcUtils.isStringColumn(firstColumnDataType)) {
+ if (dataTypeOption.isStringDataType(firstColumnDataType)) {
return Collections.singleton(new
StringPrimaryKeyIngestPosition(null, null));
}
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtilsTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtilsTest.java
index 86c8fd58873..9e14a38b169 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtilsTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtilsTest.java
@@ -21,10 +21,7 @@ import org.junit.jupiter.api.Test;
import java.sql.SQLException;
import java.sql.Statement;
-import java.sql.Types;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -32,34 +29,6 @@ import static org.mockito.Mockito.when;
class PipelineJdbcUtilsTest {
- @Test
- void assertIsIntegerColumn() {
- assertTrue(PipelineJdbcUtils.isIntegerColumn(Types.INTEGER));
- assertTrue(PipelineJdbcUtils.isIntegerColumn(Types.BIGINT));
- assertTrue(PipelineJdbcUtils.isIntegerColumn(Types.SMALLINT));
- assertTrue(PipelineJdbcUtils.isIntegerColumn(Types.TINYINT));
- assertFalse(PipelineJdbcUtils.isIntegerColumn(Types.VARCHAR));
- }
-
- @Test
- void assertIsStringColumn() {
- assertTrue(PipelineJdbcUtils.isStringColumn(Types.CHAR));
- assertTrue(PipelineJdbcUtils.isStringColumn(Types.VARCHAR));
- assertTrue(PipelineJdbcUtils.isStringColumn(Types.LONGVARCHAR));
- assertTrue(PipelineJdbcUtils.isStringColumn(Types.NCHAR));
- assertTrue(PipelineJdbcUtils.isStringColumn(Types.NVARCHAR));
- assertTrue(PipelineJdbcUtils.isStringColumn(Types.LONGNVARCHAR));
- assertFalse(PipelineJdbcUtils.isStringColumn(Types.INTEGER));
- }
-
- @Test
- void assertIsBinaryColumn() {
- assertTrue(PipelineJdbcUtils.isBinaryColumn(Types.BINARY));
- assertTrue(PipelineJdbcUtils.isBinaryColumn(Types.VARBINARY));
- assertTrue(PipelineJdbcUtils.isBinaryColumn(Types.LONGVARBINARY));
- assertFalse(PipelineJdbcUtils.isBinaryColumn(Types.VARCHAR));
- }
-
@Test
void assertCancelStatementWhenIsClosed() throws SQLException {
Statement statement = mock(Statement.class);
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/data/binary/MySQLBinlogBinaryStringHandler.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/data/binary/MySQLBinlogBinaryStringHandler.java
index b1e1893ba54..155586c45d8 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/data/binary/MySQLBinlogBinaryStringHandler.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/data/binary/MySQLBinlogBinaryStringHandler.java
@@ -19,8 +19,10 @@ package
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.
import lombok.NoArgsConstructor;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
-import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
import
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.value.string.MySQLBinaryString;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import java.io.Serializable;
import java.nio.charset.Charset;
@@ -39,6 +41,8 @@ public final class MySQLBinlogBinaryStringHandler {
* @return handled value
*/
public static Serializable handle(final PipelineColumnMetaData
columnMetaData, final MySQLBinaryString value) {
- return PipelineJdbcUtils.isBinaryColumn(columnMetaData.getDataType())
? value.getBytes() : new String(value.getBytes(), Charset.defaultCharset());
+ return new
DatabaseTypeRegistry(TypedSPILoader.getService(DatabaseType.class,
"MySQL")).getDialectDatabaseMetaData().getDataTypeOption().isBinaryDataType(columnMetaData.getDataType())
+ ? value.getBytes()
+ : new String(value.getBytes(), Charset.defaultCharset());
}
}