This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 7838f2b7857 Refactor ColumnValueReaderEngine (#26930)
7838f2b7857 is described below

commit 7838f2b7857b3b9c2a4fdec0ab3c85f0a999c986
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Jul 13 00:19:19 2023 +0800

    Refactor ColumnValueReaderEngine (#26930)
    
    * Refactor ColumnValueReader
    
    * Rename DialectColumnValueReader
    
    * Refactor ColumnValueReaderEngine
    
    * Fix sonar issue
---
 .../infra/metadata/nodepath/RuleNodePathTest.java  |  2 +-
 ...ueReader.java => DialectColumnValueReader.java} | 11 ++---
 ...DataMatchDataConsistencyCalculateAlgorithm.java | 10 ++---
 ...lueReader.java => ColumnValueReaderEngine.java} | 40 ++++++++++++++----
 .../data/pipeline/core/dumper/InventoryDumper.java |  9 ++--
 .../mysql/ingest/MySQLColumnValueReader.java       | 49 ----------------------
 ...ta.pipeline.spi.ingest.dumper.ColumnValueReader | 18 --------
 .../ingest/wal/OpenGaussColumnValueReader.java     | 30 +++++++------
 ...ine.spi.ingest.dumper.DialectColumnValueReader} |  0
 .../ingest/PostgreSQLColumnValueReader.java        | 35 +++++++++-------
 ...ine.spi.ingest.dumper.DialectColumnValueReader} |  0
 .../core/fixture/FixtureColumnValueReader.java     | 13 +++---
 ...ine.spi.ingest.dumper.DialectColumnValueReader} |  0
 13 files changed, 86 insertions(+), 131 deletions(-)

diff --git 
a/infra/common/src/test/java/org/apache/shardingsphere/infra/metadata/nodepath/RuleNodePathTest.java
 
b/infra/common/src/test/java/org/apache/shardingsphere/infra/metadata/nodepath/RuleNodePathTest.java
index 4165f686c50..6017f378f5f 100644
--- 
a/infra/common/src/test/java/org/apache/shardingsphere/infra/metadata/nodepath/RuleNodePathTest.java
+++ 
b/infra/common/src/test/java/org/apache/shardingsphere/infra/metadata/nodepath/RuleNodePathTest.java
@@ -36,7 +36,7 @@ class RuleNodePathTest {
     private RuleNodePath ruleNodePath;
     
     @BeforeEach
-    public void setup() {
+    void setup() {
         List<String> namedRuleItemNodePathTypes = 
Collections.singletonList("tables");
         List<String> uniqueRuleItemNodePathTypes = Arrays.asList("tables", 
"tables.type");
         ruleNodePath = new RuleNodePath("foo", namedRuleItemNodePathTypes, 
uniqueRuleItemNodePathTypes);
diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/ColumnValueReader.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/DialectColumnValueReader.java
similarity index 80%
rename from 
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/ColumnValueReader.java
rename to 
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/DialectColumnValueReader.java
index 0fa56e5fdc7..bf867e19053 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/ColumnValueReader.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/DialectColumnValueReader.java
@@ -23,21 +23,22 @@ import 
org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
+import java.util.Optional;
 
 /**
- * Column value reader.
+ * Dialect column value reader.
  */
 @SingletonSPI
-public interface ColumnValueReader extends DatabaseTypedSPI {
+public interface DialectColumnValueReader extends DatabaseTypedSPI {
     
     /**
-     * Read column value.
+     * Read dialect column value.
      *
      * @param resultSet result set
      * @param resultSetMetaData result set meta data
      * @param columnIndex column index
      * @return column value
-     * @throws SQLException from database
+     * @throws SQLException SQL exception
      */
-    Object readValue(ResultSet resultSet, ResultSetMetaData resultSetMetaData, 
int columnIndex) throws SQLException;
+    Optional<Object> read(ResultSet resultSet, ResultSetMetaData 
resultSetMetaData, int columnIndex) throws SQLException;
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index b6543986cf3..f80ae4e6907 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -20,18 +20,18 @@ package 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.algorithm;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
 import 
org.apache.shardingsphere.data.pipeline.common.util.JDBCStreamQueryUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.DataConsistencyCalculateParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.DataConsistencyCalculatedResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.DataMatchCalculatedResult;
+import 
org.apache.shardingsphere.data.pipeline.core.dumper.ColumnValueReaderEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
-import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
+import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
 import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
 import org.apache.shardingsphere.infra.util.spi.annotation.SPIDescription;
@@ -88,7 +88,7 @@ public final class DataMatchDataConsistencyCalculateAlgorithm 
extends AbstractSt
             Collection<Collection<Object>> records = new LinkedList<>();
             Object maxUniqueKeyValue = null;
             DatabaseType databaseType = 
TypedSPILoader.getService(DatabaseType.class, param.getDatabaseType());
-            ColumnValueReader columnValueReader = 
DatabaseTypedSPILoader.getService(ColumnValueReader.class, databaseType);
+            ColumnValueReaderEngine columnValueReaderEngine = new 
ColumnValueReaderEngine(databaseType);
             ResultSet resultSet = calculationContext.getResultSet();
             while (resultSet.next()) {
                 ShardingSpherePreconditions.checkState(!isCanceling(), () -> 
new 
PipelineTableDataConsistencyCheckLoadingFailedException(param.getSchemaName(), 
param.getLogicTableName()));
@@ -96,10 +96,10 @@ public final class 
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
                 int columnCount = resultSetMetaData.getColumnCount();
                 Collection<Object> columnRecord = new LinkedList<>();
                 for (int columnIndex = 1; columnIndex <= columnCount; 
columnIndex++) {
-                    columnRecord.add(columnValueReader.readValue(resultSet, 
resultSetMetaData, columnIndex));
+                    columnRecord.add(columnValueReaderEngine.read(resultSet, 
resultSetMetaData, columnIndex));
                 }
                 records.add(columnRecord);
-                maxUniqueKeyValue = columnValueReader.readValue(resultSet, 
resultSetMetaData, param.getUniqueKey().getOrdinalPosition());
+                maxUniqueKeyValue = columnValueReaderEngine.read(resultSet, 
resultSetMetaData, param.getUniqueKey().getOrdinalPosition());
                 if (records.size() == chunkSize) {
                     break;
                 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/AbstractColumnValueReader.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/ColumnValueReaderEngine.java
similarity index 68%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/AbstractColumnValueReader.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/ColumnValueReaderEngine.java
index adbfcc84b00..af08c516079 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/AbstractColumnValueReader.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/ColumnValueReaderEngine.java
@@ -17,27 +17,49 @@
 
 package org.apache.shardingsphere.data.pipeline.core.dumper;
 
-import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader;
+import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.DialectColumnValueReader;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
 
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Types;
+import java.util.Optional;
 
 /**
- * Abstract column value reader.
+ * Column value reader engine.
  */
-public abstract class AbstractColumnValueReader implements ColumnValueReader {
+public final class ColumnValueReaderEngine {
     
-    @Override
-    public final Object readValue(final ResultSet resultSet, final 
ResultSetMetaData metaData, final int columnIndex) throws SQLException {
-        Object result = doReadValue(resultSet, metaData, columnIndex);
-        return resultSet.wasNull() ? null : result;
+    private final DialectColumnValueReader dialectColumnValueReader;
+    
+    public ColumnValueReaderEngine(final DatabaseType databaseType) {
+        dialectColumnValueReader = 
DatabaseTypedSPILoader.findService(DialectColumnValueReader.class, 
databaseType).orElse(null);
     }
     
-    protected abstract Object doReadValue(ResultSet resultSet, 
ResultSetMetaData metaData, int columnIndex) throws SQLException;
+    /**
+     * Read column value.
+     * 
+     * @param resultSet result set
+     * @param metaData result set meta data
+     * @param columnIndex column index
+     * @return column value
+     * @throws SQLException SQL exception
+     */
+    public Object read(final ResultSet resultSet, final ResultSetMetaData 
metaData, final int columnIndex) throws SQLException {
+        if (resultSet.wasNull()) {
+            return null;
+        }
+        Optional<Object> dialectValue = readDialectValue(resultSet, metaData, 
columnIndex);
+        return dialectValue.isPresent() ? dialectValue : 
readStandardValue(resultSet, metaData, columnIndex);
+    }
+    
+    private Optional<Object> readDialectValue(final ResultSet resultSet, final 
ResultSetMetaData metaData, final int columnIndex) throws SQLException {
+        return null == dialectColumnValueReader ? Optional.empty() : 
dialectColumnValueReader.read(resultSet, metaData, columnIndex);
+    }
     
-    protected final Object defaultDoReadValue(final ResultSet resultSet, final 
ResultSetMetaData metaData, final int columnIndex) throws SQLException {
+    private static Object readStandardValue(final ResultSet resultSet, final 
ResultSetMetaData metaData, final int columnIndex) throws SQLException {
         int columnType = metaData.getColumnType(columnIndex);
         switch (columnType) {
             case Types.BOOLEAN:
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
index 84679289bbe..86745e5e013 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
@@ -44,12 +44,11 @@ import 
org.apache.shardingsphere.data.pipeline.common.util.JDBCStreamQueryUtils;
 import org.apache.shardingsphere.data.pipeline.common.util.PipelineJdbcUtils;
 import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
-import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
 import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 
 import javax.sql.DataSource;
@@ -80,7 +79,7 @@ public final class InventoryDumper extends 
AbstractLifecycleExecutor implements
     
     private final PipelineSQLBuilder sqlBuilder;
     
-    private final ColumnValueReader columnValueReader;
+    private final ColumnValueReaderEngine columnValueReaderEngine;
     
     private final PipelineTableMetaDataLoader metaDataLoader;
     
@@ -92,7 +91,7 @@ public final class InventoryDumper extends 
AbstractLifecycleExecutor implements
         this.dataSource = dataSource;
         DatabaseType databaseType = 
dumperConfig.getDataSourceConfig().getDatabaseType();
         sqlBuilder = 
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class, databaseType);
-        columnValueReader = 
DatabaseTypedSPILoader.getService(ColumnValueReader.class, databaseType);
+        columnValueReaderEngine = new ColumnValueReaderEngine(databaseType);
         this.metaDataLoader = metaDataLoader;
     }
     
@@ -207,7 +206,7 @@ public final class InventoryDumper extends 
AbstractLifecycleExecutor implements
             String columnName = insertColumnNames.isEmpty() ? 
resultSetMetaData.getColumnName(i) : insertColumnNames.get(i - 1);
             
ShardingSpherePreconditions.checkNotNull(tableMetaData.getColumnMetaData(columnName),
 () -> new PipelineInvalidParameterException(String.format("Column name is %s", 
columnName)));
             boolean isUniqueKey = 
tableMetaData.getColumnMetaData(columnName).isUniqueKey();
-            result.addColumn(new Column(columnName, 
columnValueReader.readValue(resultSet, resultSetMetaData, i), true, 
isUniqueKey));
+            result.addColumn(new Column(columnName, 
columnValueReaderEngine.read(resultSet, resultSetMetaData, i), true, 
isUniqueKey));
         }
         return result;
     }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLColumnValueReader.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLColumnValueReader.java
deleted file mode 100644
index 7fe1dd5c6f2..00000000000
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLColumnValueReader.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.mysql.ingest;
-
-import 
org.apache.shardingsphere.data.pipeline.core.dumper.AbstractColumnValueReader;
-
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-
-/**
- * Column value reader for MySQL.
- */
-public final class MySQLColumnValueReader extends AbstractColumnValueReader {
-    
-    private static final String YEAR_DATA_TYPE = "YEAR";
-    
-    @Override
-    protected Object doReadValue(final ResultSet resultSet, final 
ResultSetMetaData metaData, final int columnIndex) throws SQLException {
-        if (isYearDataType(metaData.getColumnTypeName(columnIndex))) {
-            return resultSet.getObject(columnIndex);
-        }
-        return super.defaultDoReadValue(resultSet, metaData, columnIndex);
-    }
-    
-    private boolean isYearDataType(final String columnDataTypeName) {
-        return YEAR_DATA_TYPE.equalsIgnoreCase(columnDataTypeName);
-    }
-    
-    @Override
-    public String getDatabaseType() {
-        return "MySQL";
-    }
-}
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader
 
b/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader
deleted file mode 100644
index 3c29e44c5a1..00000000000
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLColumnValueReader
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussColumnValueReader.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussColumnValueReader.java
index 31d1f512701..a3952b7cfd6 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussColumnValueReader.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussColumnValueReader.java
@@ -17,17 +17,18 @@
 
 package org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal;
 
-import 
org.apache.shardingsphere.data.pipeline.core.dumper.AbstractColumnValueReader;
+import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.DialectColumnValueReader;
 
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Types;
+import java.util.Optional;
 
 /**
  * Column value reader for openGauss.
  */
-public final class OpenGaussColumnValueReader extends 
AbstractColumnValueReader {
+public final class OpenGaussColumnValueReader implements 
DialectColumnValueReader {
     
     private static final String MONEY_TYPE = "money";
     
@@ -36,33 +37,30 @@ public final class OpenGaussColumnValueReader extends 
AbstractColumnValueReader
     private static final String BOOL_TYPE = "bool";
     
     @Override
-    protected Object doReadValue(final ResultSet resultSet, final 
ResultSetMetaData metaData, final int columnIndex) throws SQLException {
+    public Optional<Object> read(final ResultSet resultSet, final 
ResultSetMetaData metaData, final int columnIndex) throws SQLException {
         if (isMoneyType(metaData, columnIndex)) {
-            return resultSet.getBigDecimal(columnIndex);
+            return Optional.of(resultSet.getBigDecimal(columnIndex));
         }
         if (isBitType(metaData, columnIndex)) {
             // openGauss JDBC driver can't parse bit(n) correctly when n > 1, 
so JDBC url already add bitToString, there just return string
-            return resultSet.getString(columnIndex);
+            return Optional.of(resultSet.getString(columnIndex));
         }
         if (isBoolType(metaData, columnIndex)) {
-            return resultSet.getBoolean(columnIndex);
+            return Optional.of(resultSet.getBoolean(columnIndex));
         }
-        return super.defaultDoReadValue(resultSet, metaData, columnIndex);
+        return Optional.empty();
     }
     
-    private boolean isMoneyType(final ResultSetMetaData resultSetMetaData, 
final int index) throws SQLException {
-        return 
MONEY_TYPE.equalsIgnoreCase(resultSetMetaData.getColumnTypeName(index));
+    private boolean isMoneyType(final ResultSetMetaData metaData, final int 
columnIndex) throws SQLException {
+        return 
MONEY_TYPE.equalsIgnoreCase(metaData.getColumnTypeName(columnIndex));
     }
     
-    private boolean isBoolType(final ResultSetMetaData resultSetMetaData, 
final int index) throws SQLException {
-        return 
BOOL_TYPE.equalsIgnoreCase(resultSetMetaData.getColumnTypeName(index));
+    private boolean isBoolType(final ResultSetMetaData metaData, final int 
columnIndex) throws SQLException {
+        return 
BOOL_TYPE.equalsIgnoreCase(metaData.getColumnTypeName(columnIndex));
     }
     
-    private boolean isBitType(final ResultSetMetaData resultSetMetaData, final 
int index) throws SQLException {
-        if (Types.BIT == resultSetMetaData.getColumnType(index)) {
-            return 
BIT_TYPE.equalsIgnoreCase(resultSetMetaData.getColumnTypeName(index));
-        }
-        return false;
+    private boolean isBitType(final ResultSetMetaData metaData, final int 
columnIndex) throws SQLException {
+        return Types.BIT == metaData.getColumnType(columnIndex) && 
BIT_TYPE.equalsIgnoreCase(metaData.getColumnTypeName(columnIndex));
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader
 
b/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.DialectColumnValueReader
similarity index 100%
rename from 
kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader
rename to 
kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.DialectColumnValueReader
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLColumnValueReader.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLColumnValueReader.java
index 8bc1b84efef..183315d87dd 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLColumnValueReader.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLColumnValueReader.java
@@ -17,31 +17,44 @@
 
 package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
 
-import 
org.apache.shardingsphere.data.pipeline.core.dumper.AbstractColumnValueReader;
+import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.DialectColumnValueReader;
 import org.postgresql.util.PGobject;
 
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Types;
+import java.util.Optional;
 
 /**
  * Column value reader for PostgreSQL.
  */
-public final class PostgreSQLColumnValueReader extends 
AbstractColumnValueReader {
+public final class PostgreSQLColumnValueReader implements 
DialectColumnValueReader {
     
     private static final String PG_MONEY_TYPE = "money";
     
     private static final String PG_BIT_TYPE = "bit";
     
     @Override
-    protected Object doReadValue(final ResultSet resultSet, final 
ResultSetMetaData metaData, final int columnIndex) throws SQLException {
-        if (isPgMoneyType(metaData, columnIndex)) {
-            return resultSet.getBigDecimal(columnIndex);
+    public Optional<Object> read(final ResultSet resultSet, final 
ResultSetMetaData metaData, final int columnIndex) throws SQLException {
+        if (isMoneyType(metaData, columnIndex)) {
+            return Optional.of(resultSet.getBigDecimal(columnIndex));
         }
-        if (!isPgBitType(metaData, columnIndex)) {
-            return defaultDoReadValue(resultSet, metaData, columnIndex);
+        if (isBitType(metaData, columnIndex)) {
+            return Optional.of(getBitObject(resultSet, columnIndex));
         }
+        return Optional.empty();
+    }
+    
+    private boolean isMoneyType(final ResultSetMetaData metaData, final int 
columnIndex) throws SQLException {
+        return 
PG_MONEY_TYPE.equalsIgnoreCase(metaData.getColumnTypeName(columnIndex));
+    }
+    
+    private boolean isBitType(final ResultSetMetaData metaData, final int 
columnIndex) throws SQLException {
+        return Types.BIT == metaData.getColumnType(columnIndex) && 
PG_BIT_TYPE.equalsIgnoreCase(metaData.getColumnTypeName(columnIndex));
+    }
+    
+    private static PGobject getBitObject(final ResultSet resultSet, final int 
columnIndex) throws SQLException {
         PGobject result = new PGobject();
         result.setType("bit");
         Object bitValue = resultSet.getObject(columnIndex);
@@ -51,14 +64,6 @@ public final class PostgreSQLColumnValueReader extends 
AbstractColumnValueReader
         return result;
     }
     
-    private boolean isPgMoneyType(final ResultSetMetaData resultSetMetaData, 
final int index) throws SQLException {
-        return 
PG_MONEY_TYPE.equalsIgnoreCase(resultSetMetaData.getColumnTypeName(index));
-    }
-    
-    private boolean isPgBitType(final ResultSetMetaData resultSetMetaData, 
final int index) throws SQLException {
-        return Types.BIT == resultSetMetaData.getColumnType(index) && 
PG_BIT_TYPE.equalsIgnoreCase(resultSetMetaData.getColumnTypeName(index));
-    }
-    
     @Override
     public String getDatabaseType() {
         return "PostgreSQL";
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader
 
b/kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.DialectColumnValueReader
similarity index 100%
rename from 
kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader
rename to 
kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.DialectColumnValueReader
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureColumnValueReader.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureColumnValueReader.java
index e0c50eb3dfe..a801734921f 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureColumnValueReader.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureColumnValueReader.java
@@ -17,20 +17,17 @@
 
 package org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
 
-import 
org.apache.shardingsphere.data.pipeline.core.dumper.AbstractColumnValueReader;
+import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.DialectColumnValueReader;
 
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
+import java.util.Optional;
 
-/**
- * Basic column value reader.
- */
-public final class FixtureColumnValueReader extends AbstractColumnValueReader {
+public final class FixtureColumnValueReader implements 
DialectColumnValueReader {
     
     @Override
-    protected Object doReadValue(final ResultSet resultSet, final 
ResultSetMetaData metaData, final int columnIndex) throws SQLException {
-        return super.defaultDoReadValue(resultSet, metaData, columnIndex);
+    public Optional<Object> read(final ResultSet resultSet, final 
ResultSetMetaData metaData, final int columnIndex) {
+        return Optional.empty();
     }
     
     @Override
diff --git 
a/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader
 
b/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.DialectColumnValueReader
similarity index 100%
rename from 
test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader
rename to 
test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.DialectColumnValueReader

Reply via email to