This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new b54e7c45b72 Remove PipelineJdbcUtils and use DialectDataTypeOption 
instead (#35309)
b54e7c45b72 is described below

commit b54e7c45b726e68193cda64c00743ab849b6b683
Author: Liang Zhang <[email protected]>
AuthorDate: Fri May 2 23:54:13 2025 +0800

    Remove PipelineJdbcUtils and use DialectDataTypeOption instead (#35309)
    
    - Remove integer, string, and binary column type checks from 
PipelineJdbcUtils
    - Add corresponding methods to DialectDataTypeOption interface
    - Implement these methods in DefaultDataTypeOption and database-specific 
classes
    - Update InventoryDumperContextSplitter and MySQLBinlogBinaryStringHandler 
to use new methods- Remove redundant tests from PipelineJdbcUtilsTest
---
 .../option/datatype/DefaultDataTypeOption.java     | 41 +++++++++++++++
 .../option/datatype/DialectDataTypeOption.java     | 25 ++++++++++
 .../option/datatype/DefaultDataTypeOptionTest.java | 58 ++++++++++++++++++++++
 .../database/option/MySQLDataTypeOption.java       | 18 +++++++
 .../database/option/OpenGaussDataTypeOption.java   | 18 +++++++
 .../database/option/OracleDataTypeOption.java      | 18 +++++++
 .../database/option/PostgreSQLDataTypeOption.java  | 18 +++++++
 .../splitter/InventoryDumperContextSplitter.java   |  8 +--
 .../pipeline/core/util/PipelineJdbcUtilsTest.java  | 31 ------------
 .../binary/MySQLBinlogBinaryStringHandler.java     |  8 ++-
 10 files changed, 207 insertions(+), 36 deletions(-)

diff --git 
a/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/metadata/database/metadata/option/datatype/DefaultDataTypeOption.java
 
b/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/metadata/database/metadata/option/datatype/DefaultDataTypeOption.java
index 910485dabe2..b39152fcb47 100644
--- 
a/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/metadata/database/metadata/option/datatype/DefaultDataTypeOption.java
+++ 
b/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/metadata/database/metadata/option/datatype/DefaultDataTypeOption.java
@@ -17,6 +17,7 @@
 
 package 
org.apache.shardingsphere.infra.database.core.metadata.database.metadata.option.datatype;
 
+import java.sql.Types;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
@@ -35,4 +36,44 @@ public final class DefaultDataTypeOption implements 
DialectDataTypeOption {
     public Optional<Class<?>> findExtraSQLTypeClass(final int dataType, final 
boolean unsigned) {
         return Optional.empty();
     }
+    
+    @Override
+    public boolean isIntegerDataType(final int sqlType) {
+        switch (sqlType) {
+            case Types.INTEGER:
+            case Types.BIGINT:
+            case Types.SMALLINT:
+            case Types.TINYINT:
+                return true;
+            default:
+                return false;
+        }
+    }
+    
+    @Override
+    public boolean isStringDataType(final int sqlType) {
+        switch (sqlType) {
+            case Types.CHAR:
+            case Types.VARCHAR:
+            case Types.LONGVARCHAR:
+            case Types.NCHAR:
+            case Types.NVARCHAR:
+            case Types.LONGNVARCHAR:
+                return true;
+            default:
+                return false;
+        }
+    }
+    
+    @Override
+    public boolean isBinaryDataType(final int sqlType) {
+        switch (sqlType) {
+            case Types.BINARY:
+            case Types.VARBINARY:
+            case Types.LONGVARBINARY:
+                return true;
+            default:
+                return false;
+        }
+    }
 }
diff --git 
a/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/metadata/database/metadata/option/datatype/DialectDataTypeOption.java
 
b/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/metadata/database/metadata/option/datatype/DialectDataTypeOption.java
index 877f8906b96..a7eeff0b3fe 100644
--- 
a/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/metadata/database/metadata/option/datatype/DialectDataTypeOption.java
+++ 
b/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/metadata/database/metadata/option/datatype/DialectDataTypeOption.java
@@ -40,4 +40,29 @@ public interface DialectDataTypeOption {
      * @return extra SQL type class
      */
     Optional<Class<?>> findExtraSQLTypeClass(int dataType, boolean unsigned);
+    
+    /**
+     * Whether data type is integer type.
+     *
+     * @param sqlType value of java.sql.Types
+     * @return is integer type or not
+     */
+    boolean isIntegerDataType(int sqlType);
+    
+    /**
+     * Whether data type is string column.
+     *
+     * @param sqlType value of java.sql.Types
+     * @return is string type or not
+     */
+    boolean isStringDataType(int sqlType);
+    
+    /**
+     * Whether data type is binary type.
+     * <p>it doesn't include BLOB etc.</p>
+     *
+     * @param sqlType value of java.sql.Types
+     * @return is binary type or not
+     */
+    boolean isBinaryDataType(int sqlType);
 }
diff --git 
a/infra/database/core/src/test/java/org/apache/shardingsphere/infra/database/core/metadata/database/metadata/option/datatype/DefaultDataTypeOptionTest.java
 
b/infra/database/core/src/test/java/org/apache/shardingsphere/infra/database/core/metadata/database/metadata/option/datatype/DefaultDataTypeOptionTest.java
new file mode 100644
index 00000000000..99395ebff7b
--- /dev/null
+++ 
b/infra/database/core/src/test/java/org/apache/shardingsphere/infra/database/core/metadata/database/metadata/option/datatype/DefaultDataTypeOptionTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.infra.database.core.metadata.database.metadata.option.datatype;
+
+import org.junit.jupiter.api.Test;
+
+import java.sql.Types;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class DefaultDataTypeOptionTest {
+    
+    private final DialectDataTypeOption dataTypeOption = new 
DefaultDataTypeOption();
+    
+    @Test
+    void assertIsIntegerDataType() {
+        assertTrue(dataTypeOption.isIntegerDataType(Types.INTEGER));
+        assertTrue(dataTypeOption.isIntegerDataType(Types.BIGINT));
+        assertTrue(dataTypeOption.isIntegerDataType(Types.SMALLINT));
+        assertTrue(dataTypeOption.isIntegerDataType(Types.TINYINT));
+        assertFalse(dataTypeOption.isIntegerDataType(Types.VARCHAR));
+    }
+    
+    @Test
+    void assertIsStringDataType() {
+        assertTrue(dataTypeOption.isStringDataType(Types.CHAR));
+        assertTrue(dataTypeOption.isStringDataType(Types.VARCHAR));
+        assertTrue(dataTypeOption.isStringDataType(Types.LONGVARCHAR));
+        assertTrue(dataTypeOption.isStringDataType(Types.NCHAR));
+        assertTrue(dataTypeOption.isStringDataType(Types.NVARCHAR));
+        assertTrue(dataTypeOption.isStringDataType(Types.LONGNVARCHAR));
+        assertFalse(dataTypeOption.isStringDataType(Types.INTEGER));
+    }
+    
+    @Test
+    void assertIsBinaryDataType() {
+        assertTrue(dataTypeOption.isBinaryDataType(Types.BINARY));
+        assertTrue(dataTypeOption.isBinaryDataType(Types.VARBINARY));
+        assertTrue(dataTypeOption.isBinaryDataType(Types.LONGVARBINARY));
+        assertFalse(dataTypeOption.isBinaryDataType(Types.VARCHAR));
+    }
+}
diff --git 
a/infra/database/type/mysql/src/main/java/org/apache/shardingsphere/infra/database/mysql/metadata/database/option/MySQLDataTypeOption.java
 
b/infra/database/type/mysql/src/main/java/org/apache/shardingsphere/infra/database/mysql/metadata/database/option/MySQLDataTypeOption.java
index 71e7480c885..fe3b069b93e 100644
--- 
a/infra/database/type/mysql/src/main/java/org/apache/shardingsphere/infra/database/mysql/metadata/database/option/MySQLDataTypeOption.java
+++ 
b/infra/database/type/mysql/src/main/java/org/apache/shardingsphere/infra/database/mysql/metadata/database/option/MySQLDataTypeOption.java
@@ -17,6 +17,7 @@
 
 package 
org.apache.shardingsphere.infra.database.mysql.metadata.database.option;
 
+import 
org.apache.shardingsphere.infra.database.core.metadata.database.metadata.option.datatype.DefaultDataTypeOption;
 import 
org.apache.shardingsphere.infra.database.core.metadata.database.metadata.option.datatype.DialectDataTypeOption;
 
 import java.math.BigInteger;
@@ -30,6 +31,8 @@ import java.util.Optional;
  */
 public final class MySQLDataTypeOption implements DialectDataTypeOption {
     
+    private final DialectDataTypeOption dataTypeOption = new 
DefaultDataTypeOption();
+    
     @Override
     public Map<String, Integer> getExtraDataTypes() {
         Map<String, Integer> result = new HashMap<>(10, 1F);
@@ -59,4 +62,19 @@ public final class MySQLDataTypeOption implements 
DialectDataTypeOption {
         }
         return Optional.empty();
     }
+    
+    @Override
+    public boolean isIntegerDataType(final int sqlType) {
+        return dataTypeOption.isIntegerDataType(sqlType);
+    }
+    
+    @Override
+    public boolean isStringDataType(final int sqlType) {
+        return dataTypeOption.isStringDataType(sqlType);
+    }
+    
+    @Override
+    public boolean isBinaryDataType(final int sqlType) {
+        return dataTypeOption.isBinaryDataType(sqlType);
+    }
 }
diff --git 
a/infra/database/type/opengauss/src/main/java/org/apache/shardingsphere/infra/database/opengauss/metadata/database/option/OpenGaussDataTypeOption.java
 
b/infra/database/type/opengauss/src/main/java/org/apache/shardingsphere/infra/database/opengauss/metadata/database/option/OpenGaussDataTypeOption.java
index 19a843bc5ad..63d1c17ef24 100644
--- 
a/infra/database/type/opengauss/src/main/java/org/apache/shardingsphere/infra/database/opengauss/metadata/database/option/OpenGaussDataTypeOption.java
+++ 
b/infra/database/type/opengauss/src/main/java/org/apache/shardingsphere/infra/database/opengauss/metadata/database/option/OpenGaussDataTypeOption.java
@@ -18,6 +18,7 @@
 package 
org.apache.shardingsphere.infra.database.opengauss.metadata.database.option;
 
 import com.cedarsoftware.util.CaseInsensitiveMap;
+import 
org.apache.shardingsphere.infra.database.core.metadata.database.metadata.option.datatype.DefaultDataTypeOption;
 import 
org.apache.shardingsphere.infra.database.core.metadata.database.metadata.option.datatype.DialectDataTypeOption;
 
 import java.sql.Types;
@@ -29,6 +30,8 @@ import java.util.Optional;
  */
 public final class OpenGaussDataTypeOption implements DialectDataTypeOption {
     
+    private final DialectDataTypeOption dataTypeOption = new 
DefaultDataTypeOption();
+    
     @Override
     public Map<String, Integer> getExtraDataTypes() {
         Map<String, Integer> result = new CaseInsensitiveMap<>();
@@ -48,4 +51,19 @@ public final class OpenGaussDataTypeOption implements 
DialectDataTypeOption {
     public Optional<Class<?>> findExtraSQLTypeClass(final int dataType, final 
boolean unsigned) {
         return Optional.empty();
     }
+    
+    @Override
+    public boolean isIntegerDataType(final int sqlType) {
+        return dataTypeOption.isIntegerDataType(sqlType);
+    }
+    
+    @Override
+    public boolean isStringDataType(final int sqlType) {
+        return dataTypeOption.isStringDataType(sqlType);
+    }
+    
+    @Override
+    public boolean isBinaryDataType(final int sqlType) {
+        return dataTypeOption.isBinaryDataType(sqlType);
+    }
 }
diff --git 
a/infra/database/type/oracle/src/main/java/org/apache/shardingsphere/infra/database/oracle/metadata/database/option/OracleDataTypeOption.java
 
b/infra/database/type/oracle/src/main/java/org/apache/shardingsphere/infra/database/oracle/metadata/database/option/OracleDataTypeOption.java
index b6c8d11c0ed..b2cca7d3f6d 100644
--- 
a/infra/database/type/oracle/src/main/java/org/apache/shardingsphere/infra/database/oracle/metadata/database/option/OracleDataTypeOption.java
+++ 
b/infra/database/type/oracle/src/main/java/org/apache/shardingsphere/infra/database/oracle/metadata/database/option/OracleDataTypeOption.java
@@ -17,6 +17,7 @@
 
 package 
org.apache.shardingsphere.infra.database.oracle.metadata.database.option;
 
+import 
org.apache.shardingsphere.infra.database.core.metadata.database.metadata.option.datatype.DefaultDataTypeOption;
 import 
org.apache.shardingsphere.infra.database.core.metadata.database.metadata.option.datatype.DialectDataTypeOption;
 
 import java.sql.Types;
@@ -29,6 +30,8 @@ import java.util.Optional;
  */
 public final class OracleDataTypeOption implements DialectDataTypeOption {
     
+    private final DialectDataTypeOption dataTypeOption = new 
DefaultDataTypeOption();
+    
     @Override
     public Map<String, Integer> getExtraDataTypes() {
         Map<String, Integer> result = new HashMap<>(8);
@@ -50,4 +53,19 @@ public final class OracleDataTypeOption implements 
DialectDataTypeOption {
     public Optional<Class<?>> findExtraSQLTypeClass(final int dataType, final 
boolean unsigned) {
         return Optional.empty();
     }
+    
+    @Override
+    public boolean isIntegerDataType(final int sqlType) {
+        return dataTypeOption.isIntegerDataType(sqlType);
+    }
+    
+    @Override
+    public boolean isStringDataType(final int sqlType) {
+        return dataTypeOption.isStringDataType(sqlType);
+    }
+    
+    @Override
+    public boolean isBinaryDataType(final int sqlType) {
+        return dataTypeOption.isBinaryDataType(sqlType);
+    }
 }
diff --git 
a/infra/database/type/postgresql/src/main/java/org/apache/shardingsphere/infra/database/postgresql/metadata/database/option/PostgreSQLDataTypeOption.java
 
b/infra/database/type/postgresql/src/main/java/org/apache/shardingsphere/infra/database/postgresql/metadata/database/option/PostgreSQLDataTypeOption.java
index 78bb335e553..95c6657c988 100644
--- 
a/infra/database/type/postgresql/src/main/java/org/apache/shardingsphere/infra/database/postgresql/metadata/database/option/PostgreSQLDataTypeOption.java
+++ 
b/infra/database/type/postgresql/src/main/java/org/apache/shardingsphere/infra/database/postgresql/metadata/database/option/PostgreSQLDataTypeOption.java
@@ -18,6 +18,7 @@
 package 
org.apache.shardingsphere.infra.database.postgresql.metadata.database.option;
 
 import com.cedarsoftware.util.CaseInsensitiveMap;
+import 
org.apache.shardingsphere.infra.database.core.metadata.database.metadata.option.datatype.DefaultDataTypeOption;
 import 
org.apache.shardingsphere.infra.database.core.metadata.database.metadata.option.datatype.DialectDataTypeOption;
 
 import java.sql.Types;
@@ -29,6 +30,8 @@ import java.util.Optional;
  */
 public final class PostgreSQLDataTypeOption implements DialectDataTypeOption {
     
+    private final DialectDataTypeOption dataTypeOption = new 
DefaultDataTypeOption();
+    
     @Override
     public Map<String, Integer> getExtraDataTypes() {
         Map<String, Integer> result = new CaseInsensitiveMap<>();
@@ -51,4 +54,19 @@ public final class PostgreSQLDataTypeOption implements 
DialectDataTypeOption {
         }
         return Optional.empty();
     }
+    
+    @Override
+    public boolean isIntegerDataType(final int sqlType) {
+        return dataTypeOption.isIntegerDataType(sqlType);
+    }
+    
+    @Override
+    public boolean isStringDataType(final int sqlType) {
+        return dataTypeOption.isStringDataType(sqlType);
+    }
+    
+    @Override
+    public boolean isBinaryDataType(final int sqlType) {
+        return dataTypeOption.isBinaryDataType(sqlType);
+    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
index 5fa5458177e..3ff25129c73 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
@@ -35,7 +35,8 @@ import 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculato
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.InventoryRecordsCountCalculator;
 import 
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
-import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
+import 
org.apache.shardingsphere.infra.database.core.metadata.database.metadata.option.datatype.DialectDataTypeOption;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
 import 
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
 
 import java.sql.Connection;
@@ -119,13 +120,14 @@ public final class InventoryDumperContextSplitter {
         }
         List<PipelineColumnMetaData> uniqueKeyColumns = 
dumperContext.getUniqueKeyColumns();
         if (1 == uniqueKeyColumns.size()) {
+            DialectDataTypeOption dataTypeOption = new 
DatabaseTypeRegistry(sourceDataSource.getDatabaseType()).getDialectDatabaseMetaData().getDataTypeOption();
             int firstColumnDataType = uniqueKeyColumns.get(0).getDataType();
-            if (PipelineJdbcUtils.isIntegerColumn(firstColumnDataType)) {
+            if (dataTypeOption.isIntegerDataType(firstColumnDataType)) {
                 Range<Long> uniqueKeyValuesRange = 
getUniqueKeyValuesRange(jobItemContext, dumperContext);
                 int shardingSize = 
jobItemContext.getJobProcessContext().getProcessConfiguration().getRead().getShardingSize();
                 return 
InventoryPositionCalculator.getPositionByIntegerUniqueKeyRange(tableRecordsCount,
 uniqueKeyValuesRange, shardingSize);
             }
-            if (PipelineJdbcUtils.isStringColumn(firstColumnDataType)) {
+            if (dataTypeOption.isStringDataType(firstColumnDataType)) {
                 return Collections.singleton(new 
StringPrimaryKeyIngestPosition(null, null));
             }
         }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtilsTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtilsTest.java
index 86c8fd58873..9e14a38b169 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtilsTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtilsTest.java
@@ -21,10 +21,7 @@ import org.junit.jupiter.api.Test;
 
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.sql.Types;
 
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -32,34 +29,6 @@ import static org.mockito.Mockito.when;
 
 class PipelineJdbcUtilsTest {
     
-    @Test
-    void assertIsIntegerColumn() {
-        assertTrue(PipelineJdbcUtils.isIntegerColumn(Types.INTEGER));
-        assertTrue(PipelineJdbcUtils.isIntegerColumn(Types.BIGINT));
-        assertTrue(PipelineJdbcUtils.isIntegerColumn(Types.SMALLINT));
-        assertTrue(PipelineJdbcUtils.isIntegerColumn(Types.TINYINT));
-        assertFalse(PipelineJdbcUtils.isIntegerColumn(Types.VARCHAR));
-    }
-    
-    @Test
-    void assertIsStringColumn() {
-        assertTrue(PipelineJdbcUtils.isStringColumn(Types.CHAR));
-        assertTrue(PipelineJdbcUtils.isStringColumn(Types.VARCHAR));
-        assertTrue(PipelineJdbcUtils.isStringColumn(Types.LONGVARCHAR));
-        assertTrue(PipelineJdbcUtils.isStringColumn(Types.NCHAR));
-        assertTrue(PipelineJdbcUtils.isStringColumn(Types.NVARCHAR));
-        assertTrue(PipelineJdbcUtils.isStringColumn(Types.LONGNVARCHAR));
-        assertFalse(PipelineJdbcUtils.isStringColumn(Types.INTEGER));
-    }
-    
-    @Test
-    void assertIsBinaryColumn() {
-        assertTrue(PipelineJdbcUtils.isBinaryColumn(Types.BINARY));
-        assertTrue(PipelineJdbcUtils.isBinaryColumn(Types.VARBINARY));
-        assertTrue(PipelineJdbcUtils.isBinaryColumn(Types.LONGVARBINARY));
-        assertFalse(PipelineJdbcUtils.isBinaryColumn(Types.VARCHAR));
-    }
-    
     @Test
     void assertCancelStatementWhenIsClosed() throws SQLException {
         Statement statement = mock(Statement.class);
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/data/binary/MySQLBinlogBinaryStringHandler.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/data/binary/MySQLBinlogBinaryStringHandler.java
index b1e1893ba54..155586c45d8 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/data/binary/MySQLBinlogBinaryStringHandler.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/data/binary/MySQLBinlogBinaryStringHandler.java
@@ -19,8 +19,10 @@ package 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.
 
 import lombok.NoArgsConstructor;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
-import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.value.string.MySQLBinaryString;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 
 import java.io.Serializable;
 import java.nio.charset.Charset;
@@ -39,6 +41,8 @@ public final class MySQLBinlogBinaryStringHandler {
      * @return handled value
      */
     public static Serializable handle(final PipelineColumnMetaData 
columnMetaData, final MySQLBinaryString value) {
-        return PipelineJdbcUtils.isBinaryColumn(columnMetaData.getDataType()) 
? value.getBytes() : new String(value.getBytes(), Charset.defaultCharset());
+        return new 
DatabaseTypeRegistry(TypedSPILoader.getService(DatabaseType.class, 
"MySQL")).getDialectDatabaseMetaData().getDataTypeOption().isBinaryDataType(columnMetaData.getDataType())
+                ? value.getBytes()
+                : new String(value.getBytes(), Charset.defaultCharset());
     }
 }

Reply via email to