This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 7838f2b7857 Refactor ColumnValueReaderEngine (#26930)
7838f2b7857 is described below
commit 7838f2b7857b3b9c2a4fdec0ab3c85f0a999c986
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Jul 13 00:19:19 2023 +0800
Refactor ColumnValueReaderEngine (#26930)
* Refactor ColumnValueReader
* Rename DialectColumnValueReader
* Refactor ColumnValueReaderEngine
* Fix sonar issue
---
.../infra/metadata/nodepath/RuleNodePathTest.java | 2 +-
...ueReader.java => DialectColumnValueReader.java} | 11 ++---
...DataMatchDataConsistencyCalculateAlgorithm.java | 10 ++---
...lueReader.java => ColumnValueReaderEngine.java} | 40 ++++++++++++++----
.../data/pipeline/core/dumper/InventoryDumper.java | 9 ++--
.../mysql/ingest/MySQLColumnValueReader.java | 49 ----------------------
...ta.pipeline.spi.ingest.dumper.ColumnValueReader | 18 --------
.../ingest/wal/OpenGaussColumnValueReader.java | 30 +++++++------
...ine.spi.ingest.dumper.DialectColumnValueReader} | 0
.../ingest/PostgreSQLColumnValueReader.java | 35 +++++++++-------
...ine.spi.ingest.dumper.DialectColumnValueReader} | 0
.../core/fixture/FixtureColumnValueReader.java | 13 +++---
...ine.spi.ingest.dumper.DialectColumnValueReader} | 0
13 files changed, 86 insertions(+), 131 deletions(-)
diff --git
a/infra/common/src/test/java/org/apache/shardingsphere/infra/metadata/nodepath/RuleNodePathTest.java
b/infra/common/src/test/java/org/apache/shardingsphere/infra/metadata/nodepath/RuleNodePathTest.java
index 4165f686c50..6017f378f5f 100644
---
a/infra/common/src/test/java/org/apache/shardingsphere/infra/metadata/nodepath/RuleNodePathTest.java
+++
b/infra/common/src/test/java/org/apache/shardingsphere/infra/metadata/nodepath/RuleNodePathTest.java
@@ -36,7 +36,7 @@ class RuleNodePathTest {
private RuleNodePath ruleNodePath;
@BeforeEach
- public void setup() {
+ void setup() {
List<String> namedRuleItemNodePathTypes =
Collections.singletonList("tables");
List<String> uniqueRuleItemNodePathTypes = Arrays.asList("tables",
"tables.type");
ruleNodePath = new RuleNodePath("foo", namedRuleItemNodePathTypes,
uniqueRuleItemNodePathTypes);
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/ColumnValueReader.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/DialectColumnValueReader.java
similarity index 80%
rename from
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/ColumnValueReader.java
rename to
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/DialectColumnValueReader.java
index 0fa56e5fdc7..bf867e19053 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/ColumnValueReader.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/DialectColumnValueReader.java
@@ -23,21 +23,22 @@ import
org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
+import java.util.Optional;
/**
- * Column value reader.
+ * Dialect column value reader.
*/
@SingletonSPI
-public interface ColumnValueReader extends DatabaseTypedSPI {
+public interface DialectColumnValueReader extends DatabaseTypedSPI {
/**
- * Read column value.
+ * Read dialect column value.
*
* @param resultSet result set
* @param resultSetMetaData result set meta data
* @param columnIndex column index
* @return column value
- * @throws SQLException from database
+ * @throws SQLException SQL exception
*/
- Object readValue(ResultSet resultSet, ResultSetMetaData resultSetMetaData,
int columnIndex) throws SQLException;
+ Optional<Object> read(ResultSet resultSet, ResultSetMetaData
resultSetMetaData, int columnIndex) throws SQLException;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index b6543986cf3..f80ae4e6907 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -20,18 +20,18 @@ package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.algorithm;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
import
org.apache.shardingsphere.data.pipeline.common.util.JDBCStreamQueryUtils;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.DataConsistencyCalculateParameter;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.DataConsistencyCalculatedResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.DataMatchCalculatedResult;
+import
org.apache.shardingsphere.data.pipeline.core.dumper.ColumnValueReaderEngine;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
-import
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
+import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.util.spi.annotation.SPIDescription;
@@ -88,7 +88,7 @@ public final class DataMatchDataConsistencyCalculateAlgorithm
extends AbstractSt
Collection<Collection<Object>> records = new LinkedList<>();
Object maxUniqueKeyValue = null;
DatabaseType databaseType =
TypedSPILoader.getService(DatabaseType.class, param.getDatabaseType());
- ColumnValueReader columnValueReader =
DatabaseTypedSPILoader.getService(ColumnValueReader.class, databaseType);
+ ColumnValueReaderEngine columnValueReaderEngine = new
ColumnValueReaderEngine(databaseType);
ResultSet resultSet = calculationContext.getResultSet();
while (resultSet.next()) {
ShardingSpherePreconditions.checkState(!isCanceling(), () ->
new
PipelineTableDataConsistencyCheckLoadingFailedException(param.getSchemaName(),
param.getLogicTableName()));
@@ -96,10 +96,10 @@ public final class
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
int columnCount = resultSetMetaData.getColumnCount();
Collection<Object> columnRecord = new LinkedList<>();
for (int columnIndex = 1; columnIndex <= columnCount;
columnIndex++) {
- columnRecord.add(columnValueReader.readValue(resultSet,
resultSetMetaData, columnIndex));
+ columnRecord.add(columnValueReaderEngine.read(resultSet,
resultSetMetaData, columnIndex));
}
records.add(columnRecord);
- maxUniqueKeyValue = columnValueReader.readValue(resultSet,
resultSetMetaData, param.getUniqueKey().getOrdinalPosition());
+ maxUniqueKeyValue = columnValueReaderEngine.read(resultSet,
resultSetMetaData, param.getUniqueKey().getOrdinalPosition());
if (records.size() == chunkSize) {
break;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/AbstractColumnValueReader.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/ColumnValueReaderEngine.java
similarity index 68%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/AbstractColumnValueReader.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/ColumnValueReaderEngine.java
index adbfcc84b00..af08c516079 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/AbstractColumnValueReader.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/ColumnValueReaderEngine.java
@@ -17,27 +17,49 @@
package org.apache.shardingsphere.data.pipeline.core.dumper;
-import
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader;
+import
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.DialectColumnValueReader;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Types;
+import java.util.Optional;
/**
- * Abstract column value reader.
+ * Column value reader engine.
*/
-public abstract class AbstractColumnValueReader implements ColumnValueReader {
+public final class ColumnValueReaderEngine {
- @Override
- public final Object readValue(final ResultSet resultSet, final
ResultSetMetaData metaData, final int columnIndex) throws SQLException {
- Object result = doReadValue(resultSet, metaData, columnIndex);
- return resultSet.wasNull() ? null : result;
+ private final DialectColumnValueReader dialectColumnValueReader;
+
+ public ColumnValueReaderEngine(final DatabaseType databaseType) {
+ dialectColumnValueReader =
DatabaseTypedSPILoader.findService(DialectColumnValueReader.class,
databaseType).orElse(null);
}
- protected abstract Object doReadValue(ResultSet resultSet,
ResultSetMetaData metaData, int columnIndex) throws SQLException;
+ /**
+ * Read column value.
+ *
+ * @param resultSet result set
+ * @param metaData result set meta data
+ * @param columnIndex column index
+ * @return column value
+ * @throws SQLException SQL exception
+ */
+ public Object read(final ResultSet resultSet, final ResultSetMetaData
metaData, final int columnIndex) throws SQLException {
+ if (resultSet.wasNull()) {
+ return null;
+ }
+ Optional<Object> dialectValue = readDialectValue(resultSet, metaData,
columnIndex);
+ return dialectValue.isPresent() ? dialectValue :
readStandardValue(resultSet, metaData, columnIndex);
+ }
+
+ private Optional<Object> readDialectValue(final ResultSet resultSet, final
ResultSetMetaData metaData, final int columnIndex) throws SQLException {
+ return null == dialectColumnValueReader ? Optional.empty() :
dialectColumnValueReader.read(resultSet, metaData, columnIndex);
+ }
- protected final Object defaultDoReadValue(final ResultSet resultSet, final
ResultSetMetaData metaData, final int columnIndex) throws SQLException {
+ private static Object readStandardValue(final ResultSet resultSet, final
ResultSetMetaData metaData, final int columnIndex) throws SQLException {
int columnType = metaData.getColumnType(columnIndex);
switch (columnType) {
case Types.BOOLEAN:
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
index 84679289bbe..86745e5e013 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
@@ -44,12 +44,11 @@ import
org.apache.shardingsphere.data.pipeline.common.util.JDBCStreamQueryUtils;
import org.apache.shardingsphere.data.pipeline.common.util.PipelineJdbcUtils;
import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
-import
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import javax.sql.DataSource;
@@ -80,7 +79,7 @@ public final class InventoryDumper extends
AbstractLifecycleExecutor implements
private final PipelineSQLBuilder sqlBuilder;
- private final ColumnValueReader columnValueReader;
+ private final ColumnValueReaderEngine columnValueReaderEngine;
private final PipelineTableMetaDataLoader metaDataLoader;
@@ -92,7 +91,7 @@ public final class InventoryDumper extends
AbstractLifecycleExecutor implements
this.dataSource = dataSource;
DatabaseType databaseType =
dumperConfig.getDataSourceConfig().getDatabaseType();
sqlBuilder =
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class, databaseType);
- columnValueReader =
DatabaseTypedSPILoader.getService(ColumnValueReader.class, databaseType);
+ columnValueReaderEngine = new ColumnValueReaderEngine(databaseType);
this.metaDataLoader = metaDataLoader;
}
@@ -207,7 +206,7 @@ public final class InventoryDumper extends
AbstractLifecycleExecutor implements
String columnName = insertColumnNames.isEmpty() ?
resultSetMetaData.getColumnName(i) : insertColumnNames.get(i - 1);
ShardingSpherePreconditions.checkNotNull(tableMetaData.getColumnMetaData(columnName),
() -> new PipelineInvalidParameterException(String.format("Column name is %s",
columnName)));
boolean isUniqueKey =
tableMetaData.getColumnMetaData(columnName).isUniqueKey();
- result.addColumn(new Column(columnName,
columnValueReader.readValue(resultSet, resultSetMetaData, i), true,
isUniqueKey));
+ result.addColumn(new Column(columnName,
columnValueReaderEngine.read(resultSet, resultSetMetaData, i), true,
isUniqueKey));
}
return result;
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLColumnValueReader.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLColumnValueReader.java
deleted file mode 100644
index 7fe1dd5c6f2..00000000000
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLColumnValueReader.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.mysql.ingest;
-
-import
org.apache.shardingsphere.data.pipeline.core.dumper.AbstractColumnValueReader;
-
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-
-/**
- * Column value reader for MySQL.
- */
-public final class MySQLColumnValueReader extends AbstractColumnValueReader {
-
- private static final String YEAR_DATA_TYPE = "YEAR";
-
- @Override
- protected Object doReadValue(final ResultSet resultSet, final
ResultSetMetaData metaData, final int columnIndex) throws SQLException {
- if (isYearDataType(metaData.getColumnTypeName(columnIndex))) {
- return resultSet.getObject(columnIndex);
- }
- return super.defaultDoReadValue(resultSet, metaData, columnIndex);
- }
-
- private boolean isYearDataType(final String columnDataTypeName) {
- return YEAR_DATA_TYPE.equalsIgnoreCase(columnDataTypeName);
- }
-
- @Override
- public String getDatabaseType() {
- return "MySQL";
- }
-}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader
b/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader
deleted file mode 100644
index 3c29e44c5a1..00000000000
---
a/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLColumnValueReader
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussColumnValueReader.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussColumnValueReader.java
index 31d1f512701..a3952b7cfd6 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussColumnValueReader.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussColumnValueReader.java
@@ -17,17 +17,18 @@
package org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal;
-import
org.apache.shardingsphere.data.pipeline.core.dumper.AbstractColumnValueReader;
+import
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.DialectColumnValueReader;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Types;
+import java.util.Optional;
/**
* Column value reader for openGauss.
*/
-public final class OpenGaussColumnValueReader extends
AbstractColumnValueReader {
+public final class OpenGaussColumnValueReader implements
DialectColumnValueReader {
private static final String MONEY_TYPE = "money";
@@ -36,33 +37,30 @@ public final class OpenGaussColumnValueReader extends
AbstractColumnValueReader
private static final String BOOL_TYPE = "bool";
@Override
- protected Object doReadValue(final ResultSet resultSet, final
ResultSetMetaData metaData, final int columnIndex) throws SQLException {
+ public Optional<Object> read(final ResultSet resultSet, final
ResultSetMetaData metaData, final int columnIndex) throws SQLException {
if (isMoneyType(metaData, columnIndex)) {
- return resultSet.getBigDecimal(columnIndex);
+ return Optional.of(resultSet.getBigDecimal(columnIndex));
}
if (isBitType(metaData, columnIndex)) {
// openGauss JDBC driver can't parse bit(n) correctly when n > 1,
so JDBC url already add bitToString, there just return string
- return resultSet.getString(columnIndex);
+ return Optional.of(resultSet.getString(columnIndex));
}
if (isBoolType(metaData, columnIndex)) {
- return resultSet.getBoolean(columnIndex);
+ return Optional.of(resultSet.getBoolean(columnIndex));
}
- return super.defaultDoReadValue(resultSet, metaData, columnIndex);
+ return Optional.empty();
}
- private boolean isMoneyType(final ResultSetMetaData resultSetMetaData,
final int index) throws SQLException {
- return
MONEY_TYPE.equalsIgnoreCase(resultSetMetaData.getColumnTypeName(index));
+ private boolean isMoneyType(final ResultSetMetaData metaData, final int
columnIndex) throws SQLException {
+ return
MONEY_TYPE.equalsIgnoreCase(metaData.getColumnTypeName(columnIndex));
}
- private boolean isBoolType(final ResultSetMetaData resultSetMetaData,
final int index) throws SQLException {
- return
BOOL_TYPE.equalsIgnoreCase(resultSetMetaData.getColumnTypeName(index));
+ private boolean isBoolType(final ResultSetMetaData metaData, final int
columnIndex) throws SQLException {
+ return
BOOL_TYPE.equalsIgnoreCase(metaData.getColumnTypeName(columnIndex));
}
- private boolean isBitType(final ResultSetMetaData resultSetMetaData, final
int index) throws SQLException {
- if (Types.BIT == resultSetMetaData.getColumnType(index)) {
- return
BIT_TYPE.equalsIgnoreCase(resultSetMetaData.getColumnTypeName(index));
- }
- return false;
+ private boolean isBitType(final ResultSetMetaData metaData, final int
columnIndex) throws SQLException {
+ return Types.BIT == metaData.getColumnType(columnIndex) &&
BIT_TYPE.equalsIgnoreCase(metaData.getColumnTypeName(columnIndex));
}
@Override
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader
b/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.DialectColumnValueReader
similarity index 100%
rename from
kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader
rename to
kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.DialectColumnValueReader
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLColumnValueReader.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLColumnValueReader.java
index 8bc1b84efef..183315d87dd 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLColumnValueReader.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLColumnValueReader.java
@@ -17,31 +17,44 @@
package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
-import
org.apache.shardingsphere.data.pipeline.core.dumper.AbstractColumnValueReader;
+import
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.DialectColumnValueReader;
import org.postgresql.util.PGobject;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Types;
+import java.util.Optional;
/**
* Column value reader for PostgreSQL.
*/
-public final class PostgreSQLColumnValueReader extends
AbstractColumnValueReader {
+public final class PostgreSQLColumnValueReader implements
DialectColumnValueReader {
private static final String PG_MONEY_TYPE = "money";
private static final String PG_BIT_TYPE = "bit";
@Override
- protected Object doReadValue(final ResultSet resultSet, final
ResultSetMetaData metaData, final int columnIndex) throws SQLException {
- if (isPgMoneyType(metaData, columnIndex)) {
- return resultSet.getBigDecimal(columnIndex);
+ public Optional<Object> read(final ResultSet resultSet, final
ResultSetMetaData metaData, final int columnIndex) throws SQLException {
+ if (isMoneyType(metaData, columnIndex)) {
+ return Optional.of(resultSet.getBigDecimal(columnIndex));
}
- if (!isPgBitType(metaData, columnIndex)) {
- return defaultDoReadValue(resultSet, metaData, columnIndex);
+ if (isBitType(metaData, columnIndex)) {
+ return Optional.of(getBitObject(resultSet, columnIndex));
}
+ return Optional.empty();
+ }
+
+ private boolean isMoneyType(final ResultSetMetaData metaData, final int
columnIndex) throws SQLException {
+ return
PG_MONEY_TYPE.equalsIgnoreCase(metaData.getColumnTypeName(columnIndex));
+ }
+
+ private boolean isBitType(final ResultSetMetaData metaData, final int
columnIndex) throws SQLException {
+ return Types.BIT == metaData.getColumnType(columnIndex) &&
PG_BIT_TYPE.equalsIgnoreCase(metaData.getColumnTypeName(columnIndex));
+ }
+
+ private static PGobject getBitObject(final ResultSet resultSet, final int
columnIndex) throws SQLException {
PGobject result = new PGobject();
result.setType("bit");
Object bitValue = resultSet.getObject(columnIndex);
@@ -51,14 +64,6 @@ public final class PostgreSQLColumnValueReader extends
AbstractColumnValueReader
return result;
}
- private boolean isPgMoneyType(final ResultSetMetaData resultSetMetaData,
final int index) throws SQLException {
- return
PG_MONEY_TYPE.equalsIgnoreCase(resultSetMetaData.getColumnTypeName(index));
- }
-
- private boolean isPgBitType(final ResultSetMetaData resultSetMetaData,
final int index) throws SQLException {
- return Types.BIT == resultSetMetaData.getColumnType(index) &&
PG_BIT_TYPE.equalsIgnoreCase(resultSetMetaData.getColumnTypeName(index));
- }
-
@Override
public String getDatabaseType() {
return "PostgreSQL";
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader
b/kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.DialectColumnValueReader
similarity index 100%
rename from
kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader
rename to
kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.DialectColumnValueReader
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureColumnValueReader.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureColumnValueReader.java
index e0c50eb3dfe..a801734921f 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureColumnValueReader.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureColumnValueReader.java
@@ -17,20 +17,17 @@
package org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
-import
org.apache.shardingsphere.data.pipeline.core.dumper.AbstractColumnValueReader;
+import
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.DialectColumnValueReader;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
+import java.util.Optional;
-/**
- * Basic column value reader.
- */
-public final class FixtureColumnValueReader extends AbstractColumnValueReader {
+public final class FixtureColumnValueReader implements
DialectColumnValueReader {
@Override
- protected Object doReadValue(final ResultSet resultSet, final
ResultSetMetaData metaData, final int columnIndex) throws SQLException {
- return super.defaultDoReadValue(resultSet, metaData, columnIndex);
+ public Optional<Object> read(final ResultSet resultSet, final
ResultSetMetaData metaData, final int columnIndex) {
+ return Optional.empty();
}
@Override
diff --git
a/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader
b/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.DialectColumnValueReader
similarity index 100%
rename from
test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader
rename to
test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.DialectColumnValueReader