This is an automated email from the ASF dual-hosted git repository.

jianglongtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new f1b654e4ee3 Refactor JDBCStreamQueryBuilder (#27393)
f1b654e4ee3 is described below

commit f1b654e4ee319fe6d87b005d1b0efd188dcac1f9
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Jul 24 01:03:18 2023 +0800

    Refactor JDBCStreamQueryBuilder (#27393)
    
    * Refactor DatabaseTypeEngine
    
    * Refactor JDBCStreamQueryBuilder
---
 .../infra/database/DatabaseTypeEngine.java         | 10 +--
 .../infra/database/spi/DatabaseType.java           |  4 +-
 .../query/DialectJDBCStreamQueryBuilder.java       | 44 ++++++++++++
 .../common/query/JDBCStreamQueryBuilder.java       | 56 +++++++++++++++
 .../query/dialect/H2JDBCStreamQueryBuilder.java    | 42 +++++++++++
 .../query/dialect/MySQLJDBCStreamQueryBuilder.java | 44 ++++++++++++
 .../dialect/OpenGaussJDBCStreamQueryBuilder.java   | 44 ++++++++++++
 .../dialect/PostgreSQLJDBCStreamQueryBuilder.java  | 44 ++++++++++++
 .../pipeline/common/util/JDBCStreamQueryUtils.java | 81 ----------------------
 ...DataMatchDataConsistencyCalculateAlgorithm.java |  6 +-
 .../data/pipeline/core/dumper/InventoryDumper.java |  6 +-
 ...line.common.query.DialectJDBCStreamQueryBuilder | 21 ++++++
 12 files changed, 308 insertions(+), 94 deletions(-)

diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/database/DatabaseTypeEngine.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/database/DatabaseTypeEngine.java
index 67dfaab89da..04d5780c508 100644
--- 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/database/DatabaseTypeEngine.java
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/database/DatabaseTypeEngine.java
@@ -71,6 +71,11 @@ public final class DatabaseTypeEngine {
         return configuredDatabaseType.orElseGet(() -> 
getStorageType(getEnabledDataSources(databaseConfigs).values()));
     }
     
+    private static Optional<DatabaseType> findConfiguredDatabaseType(final 
ConfigurationProperties props) {
+        DatabaseType configuredDatabaseType = 
props.getValue(ConfigurationPropertyKey.PROXY_FRONTEND_DATABASE_PROTOCOL_TYPE);
+        return null == configuredDatabaseType ? Optional.empty() : 
Optional.of(configuredDatabaseType.getTrunkDatabaseType().orElse(configuredDatabaseType));
+    }
+    
     private static Map<String, DataSource> getEnabledDataSources(final 
Map<String, ? extends DatabaseConfiguration> databaseConfigs) {
         Map<String, DataSource> result = new LinkedHashMap<>();
         for (Entry<String, ? extends DatabaseConfiguration> entry : 
databaseConfigs.entrySet()) {
@@ -127,11 +132,6 @@ public final class DatabaseTypeEngine {
         }
     }
     
-    private static Optional<DatabaseType> findConfiguredDatabaseType(final 
ConfigurationProperties props) {
-        DatabaseType configuredDatabaseType = 
props.getValue(ConfigurationPropertyKey.PROXY_FRONTEND_DATABASE_PROTOCOL_TYPE);
-        return null == configuredDatabaseType ? Optional.empty() : 
Optional.of(configuredDatabaseType.getTrunkDatabaseType().orElse(configuredDatabaseType));
-    }
-    
     /**
      * Get default schema name.
      * 
diff --git 
a/infra/database/spi/src/main/java/org/apache/shardingsphere/infra/database/spi/DatabaseType.java
 
b/infra/database/spi/src/main/java/org/apache/shardingsphere/infra/database/spi/DatabaseType.java
index a75f55c932b..1ed6e80f8d1 100644
--- 
a/infra/database/spi/src/main/java/org/apache/shardingsphere/infra/database/spi/DatabaseType.java
+++ 
b/infra/database/spi/src/main/java/org/apache/shardingsphere/infra/database/spi/DatabaseType.java
@@ -59,9 +59,9 @@ public interface DatabaseType extends TypedSPI {
     }
     
     /**
-     * Get alias of JDBC URL prefixes.
+     * Get JDBC URL prefixes.
      * 
-     * @return Alias of JDBC URL prefixes
+     * @return prefixes of JDBC URL
      */
     Collection<String> getJdbcUrlPrefixes();
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/query/DialectJDBCStreamQueryBuilder.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/query/DialectJDBCStreamQueryBuilder.java
new file mode 100644
index 00000000000..b0ef748d73d
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/query/DialectJDBCStreamQueryBuilder.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.common.query;
+
+import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI;
+import org.apache.shardingsphere.infra.database.spi.DatabaseType;
+import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/**
+ * Dialect JDBC stream query builder.
+ */
+@SingletonSPI
+public interface DialectJDBCStreamQueryBuilder extends DatabaseTypedSPI {
+    
+    /**
+     * Build streamed prepared statement.
+     *
+     * @param connection connection
+     * @param databaseType database type
+     * @param sql SQL to be queried
+     * @return built prepared statement
+     * @throws SQLException SQL exception
+     */
+    PreparedStatement build(DatabaseType databaseType, Connection connection, 
String sql) throws SQLException;
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/query/JDBCStreamQueryBuilder.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/query/JDBCStreamQueryBuilder.java
new file mode 100644
index 00000000000..c3c29fee756
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/query/JDBCStreamQueryBuilder.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.common.query;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
+import org.apache.shardingsphere.infra.database.spi.DatabaseType;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Optional;
+
+/**
+ * JDBC stream query builder.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+@Slf4j
+public final class JDBCStreamQueryBuilder {
+    
+    /**
+     * Build streamed prepared statement.
+     *
+     * @param connection connection
+     * @param databaseType database type
+     * @param sql SQL to be queried
+     * @return built prepared statement
+     * @throws SQLException SQL exception
+     */
+    public static PreparedStatement build(final DatabaseType databaseType, 
final Connection connection, final String sql) throws SQLException {
+        Optional<DialectJDBCStreamQueryBuilder> dialectBuilder = 
DatabaseTypedSPILoader.findService(DialectJDBCStreamQueryBuilder.class, 
databaseType);
+        if (dialectBuilder.isPresent()) {
+            return dialectBuilder.get().build(databaseType, connection, sql);
+        }
+        log.warn("not support {} streaming query now, pay attention to memory 
usage", databaseType.getType());
+        return connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, 
ResultSet.CONCUR_READ_ONLY);
+    }
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/query/dialect/H2JDBCStreamQueryBuilder.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/query/dialect/H2JDBCStreamQueryBuilder.java
new file mode 100644
index 00000000000..7744207c709
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/query/dialect/H2JDBCStreamQueryBuilder.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.common.query.dialect;
+
+import 
org.apache.shardingsphere.data.pipeline.common.query.DialectJDBCStreamQueryBuilder;
+import org.apache.shardingsphere.infra.database.spi.DatabaseType;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+/**
+ * JDBC stream query builder for H2.
+ */
+public final class H2JDBCStreamQueryBuilder implements 
DialectJDBCStreamQueryBuilder {
+    
+    @Override
+    public PreparedStatement build(final DatabaseType databaseType, final 
Connection connection, final String sql) throws SQLException {
+        return connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, 
ResultSet.CONCUR_READ_ONLY);
+    }
+    
+    @Override
+    public String getDatabaseType() {
+        return "H2";
+    }
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/query/dialect/MySQLJDBCStreamQueryBuilder.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/query/dialect/MySQLJDBCStreamQueryBuilder.java
new file mode 100644
index 00000000000..76b9e605255
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/query/dialect/MySQLJDBCStreamQueryBuilder.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.common.query.dialect;
+
+import 
org.apache.shardingsphere.data.pipeline.common.query.DialectJDBCStreamQueryBuilder;
+import org.apache.shardingsphere.infra.database.spi.DatabaseType;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+/**
+ * JDBC stream query builder for MySQL.
+ */
+public final class MySQLJDBCStreamQueryBuilder implements 
DialectJDBCStreamQueryBuilder {
+    
+    @Override
+    public PreparedStatement build(final DatabaseType databaseType, final 
Connection connection, final String sql) throws SQLException {
+        PreparedStatement result = connection.prepareStatement(sql, 
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+        result.setFetchSize(Integer.MIN_VALUE);
+        return result;
+    }
+    
+    @Override
+    public String getDatabaseType() {
+        return "MySQL";
+    }
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/query/dialect/OpenGaussJDBCStreamQueryBuilder.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/query/dialect/OpenGaussJDBCStreamQueryBuilder.java
new file mode 100644
index 00000000000..6ef46996c49
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/query/dialect/OpenGaussJDBCStreamQueryBuilder.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.common.query.dialect;
+
+import 
org.apache.shardingsphere.data.pipeline.common.query.DialectJDBCStreamQueryBuilder;
+import org.apache.shardingsphere.infra.database.spi.DatabaseType;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+/**
+ * JDBC stream query builder for openGauss.
+ */
+public final class OpenGaussJDBCStreamQueryBuilder implements 
DialectJDBCStreamQueryBuilder {
+    
+    @Override
+    public PreparedStatement build(final DatabaseType databaseType, final 
Connection connection, final String sql) throws SQLException {
+        PreparedStatement result = connection.prepareStatement(sql, 
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, 
ResultSet.CLOSE_CURSORS_AT_COMMIT);
+        connection.setAutoCommit(false);
+        return result;
+    }
+    
+    @Override
+    public String getDatabaseType() {
+        return "openGauss";
+    }
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/query/dialect/PostgreSQLJDBCStreamQueryBuilder.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/query/dialect/PostgreSQLJDBCStreamQueryBuilder.java
new file mode 100644
index 00000000000..d66898f3ce3
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/query/dialect/PostgreSQLJDBCStreamQueryBuilder.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.common.query.dialect;
+
+import 
org.apache.shardingsphere.data.pipeline.common.query.DialectJDBCStreamQueryBuilder;
+import org.apache.shardingsphere.infra.database.spi.DatabaseType;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+/**
+ * JDBC stream query builder for PostgreSQL.
+ */
+public final class PostgreSQLJDBCStreamQueryBuilder implements 
DialectJDBCStreamQueryBuilder {
+    
+    @Override
+    public PreparedStatement build(final DatabaseType databaseType, final 
Connection connection, final String sql) throws SQLException {
+        PreparedStatement result = connection.prepareStatement(sql, 
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, 
ResultSet.CLOSE_CURSORS_AT_COMMIT);
+        connection.setAutoCommit(false);
+        return result;
+    }
+    
+    @Override
+    public String getDatabaseType() {
+        return "PostgreSQL";
+    }
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/util/JDBCStreamQueryUtils.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/util/JDBCStreamQueryUtils.java
deleted file mode 100644
index 71b077acd66..00000000000
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/util/JDBCStreamQueryUtils.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.common.util;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.infra.database.h2.H2DatabaseType;
-import org.apache.shardingsphere.infra.database.mysql.MySQLDatabaseType;
-import org.apache.shardingsphere.infra.database.spi.DatabaseType;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-
-/**
- * JDBC stream query utility class.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-@Slf4j
-public final class JDBCStreamQueryUtils {
-    
-    /**
-     * Generate stream query prepared statement.
-     *
-     * @param connection connection
-     * @param databaseType database type
-     * @param sql sql
-     * @return stream query prepared statement
-     * @throws SQLException SQL exception
-     */
-    public static PreparedStatement generateStreamQueryPreparedStatement(final 
DatabaseType databaseType, final Connection connection, final String sql) 
throws SQLException {
-        if (databaseType instanceof MySQLDatabaseType) {
-            return generateForMySQL(connection, sql);
-        }
-        if (databaseType.getDefaultSchema().isPresent()) {
-            return generateForPostgreSQL(connection, sql);
-        }
-        if (databaseType instanceof H2DatabaseType) {
-            return generateByDefault(connection, sql);
-        }
-        if (databaseType.getTrunkDatabaseType().isPresent()) {
-            return 
generateStreamQueryPreparedStatement(databaseType.getTrunkDatabaseType().get(), 
connection, sql);
-        }
-        log.warn("not support {} streaming query now, pay attention to memory 
usage", databaseType.getType());
-        return generateByDefault(connection, sql);
-    }
-    
-    // TODO Consider use SPI
-    private static PreparedStatement generateForMySQL(final Connection 
connection, final String sql) throws SQLException {
-        PreparedStatement result = connection.prepareStatement(sql, 
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
-        result.setFetchSize(Integer.MIN_VALUE);
-        return result;
-    }
-    
-    private static PreparedStatement generateForPostgreSQL(final Connection 
connection, final String sql) throws SQLException {
-        PreparedStatement result = connection.prepareStatement(sql, 
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, 
ResultSet.CLOSE_CURSORS_AT_COMMIT);
-        connection.setAutoCommit(false);
-        return result;
-    }
-    
-    private static PreparedStatement generateByDefault(final Connection 
connection, final String sql) throws SQLException {
-        return connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, 
ResultSet.CONCUR_READ_ONLY);
-    }
-}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index 883a2e2740b..3ee3a6480be 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -20,16 +20,16 @@ package 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.algorithm;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.common.query.JDBCStreamQueryBuilder;
 import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineDataConsistencyCalculateSQLBuilder;
-import 
org.apache.shardingsphere.data.pipeline.common.util.JDBCStreamQueryUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.DataConsistencyCalculateParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.DataConsistencyCalculatedResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.DataMatchCalculatedResult;
 import 
org.apache.shardingsphere.data.pipeline.core.dumper.ColumnValueReaderEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
-import org.apache.shardingsphere.infra.database.spi.DatabaseType;
 import org.apache.shardingsphere.infra.database.mysql.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.database.spi.DatabaseType;
 import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
 import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
@@ -143,7 +143,7 @@ public final class 
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
     
     private void fulfillCalculationContext(final CalculationContext 
calculationContext, final DataConsistencyCalculateParameter param) throws 
SQLException {
         String sql = getQuerySQL(param);
-        PreparedStatement preparedStatement = 
JDBCStreamQueryUtils.generateStreamQueryPreparedStatement(param.getDatabaseType(),
 calculationContext.getConnection(), sql);
+        PreparedStatement preparedStatement = 
JDBCStreamQueryBuilder.build(param.getDatabaseType(), 
calculationContext.getConnection(), sql);
         setCurrentStatement(preparedStatement);
         if (!(param.getDatabaseType() instanceof MySQLDatabaseType)) {
             preparedStatement.setFetchSize(chunkSize);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
index 47e4c2ce838..11bdb10d8af 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
@@ -40,14 +40,14 @@ import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPo
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition;
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.PrimaryKeyPosition;
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.PrimaryKeyPositionFactory;
+import 
org.apache.shardingsphere.data.pipeline.common.query.JDBCStreamQueryBuilder;
 import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineInventoryDumpSQLBuilder;
-import 
org.apache.shardingsphere.data.pipeline.common.util.JDBCStreamQueryUtils;
 import org.apache.shardingsphere.data.pipeline.common.util.PipelineJdbcUtils;
 import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
-import org.apache.shardingsphere.infra.database.spi.DatabaseType;
 import org.apache.shardingsphere.infra.database.mysql.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.database.spi.DatabaseType;
 import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 
 import javax.sql.DataSource;
@@ -117,7 +117,7 @@ public final class InventoryDumper extends 
AbstractLifecycleExecutor implements
         if (null != dumperConfig.getTransactionIsolation()) {
             
connection.setTransactionIsolation(dumperConfig.getTransactionIsolation());
         }
-        try (PreparedStatement preparedStatement = 
JDBCStreamQueryUtils.generateStreamQueryPreparedStatement(databaseType, 
connection, buildInventoryDumpSQL())) {
+        try (PreparedStatement preparedStatement = 
JDBCStreamQueryBuilder.build(databaseType, connection, 
buildInventoryDumpSQL())) {
             dumpStatement.set(preparedStatement);
             if (!(databaseType instanceof MySQLDatabaseType)) {
                 preparedStatement.setFetchSize(batchSize);
diff --git 
a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.query.DialectJDBCStreamQueryBuilder
 
b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.query.DialectJDBCStreamQueryBuilder
new file mode 100644
index 00000000000..840475b7da2
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.query.DialectJDBCStreamQueryBuilder
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.common.query.dialect.MySQLJDBCStreamQueryBuilder
+org.apache.shardingsphere.data.pipeline.common.query.dialect.PostgreSQLJDBCStreamQueryBuilder
+org.apache.shardingsphere.data.pipeline.common.query.dialect.OpenGaussJDBCStreamQueryBuilder
+org.apache.shardingsphere.data.pipeline.common.query.dialect.H2JDBCStreamQueryBuilder

Reply via email to