This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 6264f03d4d0 Support openGauss fetch cursor next statement (#18180)
6264f03d4d0 is described below

commit 6264f03d4d0c3c7840e9e1aec6a5304a5cf340c8
Author: Zhengqiang Duan <[email protected]>
AuthorDate: Sun Jun 5 17:32:16 2022 +0800

    Support openGauss fetch cursor next statement (#18180)
    
    * Support openGauss fetch cursor next statement
    
    * fix unit test
    
    * fix unit test
    
    * add final modifier
    
    * optimize code
    
    * fix checkstyle
    
    * fix unit test
---
 .../sharding/merge/ShardingResultMergerEngine.java |  17 ++-
 .../IteratorStreamMergedResult.java                |   2 +-
 .../merge/dal/show/ShowIndexMergedResult.java      |   6 +-
 .../merge/ddl/ShardingDDLResultMerger.java         |  71 ++++++++++++
 .../ddl/fetch/FetchOrderByValueQueuesHolder.java   |  51 ++++++++
 .../merge/ddl/fetch/FetchStreamMergedResult.java   |  77 ++++++++++++
 .../merge/dql/ShardingDQLResultMerger.java         |   2 +-
 .../merge/ShardingResultMergerEngineTest.java      |  10 ++
 .../IteratorStreamMergedResultTest.java            |   2 +-
 .../merge/ddl/ShardingDDLResultMergerTest.java     | 129 +++++++++++++++++++++
 .../fetch/FetchOrderByValueQueuesHolderTest.java   |  43 +++++++
 .../merge/dql/ShardingDQLResultMergerTest.java     |   2 +-
 ...rdingTableRulesUsedAlgorithmQueryResultSet.java |   2 +-
 .../infra/binder/SQLStatementContextFactory.java   |   7 +-
 .../statement/ddl/CursorStatementContext.java      |  10 +-
 .../statement/SQLStatementContextFactoryTest.java  |   8 +-
 .../core/connection/ShardingSphereConnection.java  |   3 +
 .../jdbc/JDBCDatabaseCommunicationEngine.java      |   2 +-
 .../transaction/JDBCBackendTransactionManager.java |   3 +
 .../opengauss/OpenGaussSelectDatabaseExecutor.java |   2 +-
 .../text/transaction/TransactionXAHandler.java     |   2 +
 .../jaxb/cases/domain/SQLParserTestCases.java      |   8 +-
 22 files changed, 432 insertions(+), 27 deletions(-)

diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ShardingResultMergerEngine.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ShardingResultMergerEngine.java
index a988b47c7fd..4a311feabe0 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ShardingResultMergerEngine.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ShardingResultMergerEngine.java
@@ -17,18 +17,20 @@
 
 package org.apache.shardingsphere.sharding.merge;
 
-import org.apache.shardingsphere.sharding.constant.ShardingOrder;
-import org.apache.shardingsphere.sharding.rule.ShardingRule;
-import org.apache.shardingsphere.sharding.merge.dal.ShardingDALResultMerger;
-import org.apache.shardingsphere.sharding.merge.dql.ShardingDQLResultMerger;
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
 import 
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
-import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
 import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.merge.engine.merger.ResultMerger;
 import org.apache.shardingsphere.infra.merge.engine.merger.ResultMergerEngine;
 import 
org.apache.shardingsphere.infra.merge.engine.merger.impl.TransparentResultMerger;
+import org.apache.shardingsphere.sharding.constant.ShardingOrder;
+import org.apache.shardingsphere.sharding.merge.dal.ShardingDALResultMerger;
+import org.apache.shardingsphere.sharding.merge.ddl.ShardingDDLResultMerger;
+import org.apache.shardingsphere.sharding.merge.dql.ShardingDQLResultMerger;
+import org.apache.shardingsphere.sharding.rule.ShardingRule;
+import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
+import 
org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.DDLStatement;
 
 /**
  * Result merger engine for sharding.
@@ -41,6 +43,9 @@ public final class ShardingResultMergerEngine implements 
ResultMergerEngine<Shar
         if (sqlStatementContext instanceof SelectStatementContext) {
             return new ShardingDQLResultMerger(databaseType);
         }
+        if (sqlStatementContext.getSqlStatement() instanceof DDLStatement) {
+            return new ShardingDDLResultMerger();
+        }
         if (sqlStatementContext.getSqlStatement() instanceof DALStatement) {
             return new ShardingDALResultMerger(databaseName, shardingRule);
         }
diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/dql/iterator/IteratorStreamMergedResult.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/common/IteratorStreamMergedResult.java
similarity index 96%
rename from 
shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/dql/iterator/IteratorStreamMergedResult.java
rename to 
shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/common/IteratorStreamMergedResult.java
index 5207717c179..ee227a2498a 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/dql/iterator/IteratorStreamMergedResult.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/common/IteratorStreamMergedResult.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.sharding.merge.dql.iterator;
+package org.apache.shardingsphere.sharding.merge.common;
 
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
 import 
org.apache.shardingsphere.infra.merge.result.impl.stream.StreamMergedResult;
diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/dal/show/ShowIndexMergedResult.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/dal/show/ShowIndexMergedResult.java
index 2f5b562c8ce..15d819c6421 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/dal/show/ShowIndexMergedResult.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/dal/show/ShowIndexMergedResult.java
@@ -34,7 +34,7 @@ import java.util.Optional;
 /**
  * Merged result for show index.
  */
-public class ShowIndexMergedResult extends MemoryMergedResult<ShardingRule> {
+public final class ShowIndexMergedResult extends 
MemoryMergedResult<ShardingRule> {
     
     public ShowIndexMergedResult(final ShardingRule shardingRule,
                                  final SQLStatementContext<?> 
sqlStatementContext, final ShardingSphereSchema schema, final List<QueryResult> 
queryResults) throws SQLException {
@@ -42,8 +42,8 @@ public class ShowIndexMergedResult extends 
MemoryMergedResult<ShardingRule> {
     }
     
     @Override
-    protected final List<MemoryQueryResultRow> init(final ShardingRule 
shardingRule, final ShardingSphereSchema schema,
-                                                    final 
SQLStatementContext<?> sqlStatementContext, final List<QueryResult> 
queryResults) throws SQLException {
+    protected List<MemoryQueryResultRow> init(final ShardingRule shardingRule, 
final ShardingSphereSchema schema,
+                                              final SQLStatementContext<?> 
sqlStatementContext, final List<QueryResult> queryResults) throws SQLException {
         List<MemoryQueryResultRow> result = new LinkedList<>();
         for (QueryResult each : queryResults) {
             while (each.next()) {
diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/ShardingDDLResultMerger.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/ShardingDDLResultMerger.java
new file mode 100644
index 00000000000..a929d8f6087
--- /dev/null
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/ShardingDDLResultMerger.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.sharding.merge.ddl;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import 
org.apache.shardingsphere.infra.binder.statement.ddl.FetchStatementContext;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
+import org.apache.shardingsphere.infra.merge.engine.merger.ResultMerger;
+import org.apache.shardingsphere.infra.merge.result.MergedResult;
+import 
org.apache.shardingsphere.infra.merge.result.impl.transparent.TransparentMergedResult;
+import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import 
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
+import 
org.apache.shardingsphere.sharding.merge.common.IteratorStreamMergedResult;
+import 
org.apache.shardingsphere.sharding.merge.ddl.fetch.FetchStreamMergedResult;
+import org.apache.shardingsphere.sql.parser.sql.common.util.SQLUtil;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * DDL result merger for Sharding.
+ */
+@RequiredArgsConstructor
+public final class ShardingDDLResultMerger implements ResultMerger {
+    
+    @Override
+    public MergedResult merge(final List<QueryResult> queryResults, final 
SQLStatementContext<?> sqlStatementContext, final ShardingSphereDatabase 
database) throws SQLException {
+        if (!(sqlStatementContext instanceof FetchStatementContext)) {
+            return new TransparentMergedResult(queryResults.get(0));
+        }
+        if (1 == queryResults.size()) {
+            return new IteratorStreamMergedResult(queryResults);
+        }
+        Map<String, Integer> columnLabelIndexMap = 
getColumnLabelIndexMap(queryResults.get(0));
+        FetchStatementContext statementContext = (FetchStatementContext) 
sqlStatementContext;
+        
statementContext.getCursorStatementContext().getSelectStatementContext().setIndexes(columnLabelIndexMap);
+        return new FetchStreamMergedResult(queryResults, statementContext, 
getSchema(sqlStatementContext, database));
+    }
+    
+    private ShardingSphereSchema getSchema(final SQLStatementContext<?> 
sqlStatementContext, final ShardingSphereDatabase database) {
+        String defaultSchemaName = 
DatabaseTypeEngine.getDefaultSchemaName(sqlStatementContext.getDatabaseType(), 
database.getName());
+        return 
sqlStatementContext.getTablesContext().getSchemaName().map(optional -> 
database.getSchemas().get(optional)).orElseGet(() -> 
database.getSchemas().get(defaultSchemaName));
+    }
+    
+    private Map<String, Integer> getColumnLabelIndexMap(final QueryResult 
queryResult) throws SQLException {
+        Map<String, Integer> result = new 
TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+        for (int i = 0; i < queryResult.getMetaData().getColumnCount(); i++) {
+            
result.put(SQLUtil.getExactlyValue(queryResult.getMetaData().getColumnLabel(i + 
1)), i + 1);
+        }
+        return result;
+    }
+}
diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueQueuesHolder.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueQueuesHolder.java
new file mode 100644
index 00000000000..f06927506cf
--- /dev/null
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueQueuesHolder.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.sharding.merge.ddl.fetch;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.sharding.merge.dql.orderby.OrderByValue;
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Hold fetch order by value queues for current thread.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class FetchOrderByValueQueuesHolder {
+    
+    private static final ThreadLocal<Map<String, Queue<OrderByValue>>> 
ORDER_BY_VALUE_QUEUES = ThreadLocal.withInitial(ConcurrentHashMap::new);
+    
+    /**
+     * Get fetch order by value queues.
+     *
+     * @return fetch order by value queues
+     */
+    public static Map<String, Queue<OrderByValue>> get() {
+        return ORDER_BY_VALUE_QUEUES.get();
+    }
+    
+    /**
+     * Remove fetch order by value queues.
+     */
+    public static void remove() {
+        ORDER_BY_VALUE_QUEUES.remove();
+    }
+}
diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
new file mode 100644
index 00000000000..7c6b7e99298
--- /dev/null
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.sharding.merge.ddl.fetch;
+
+import 
org.apache.shardingsphere.infra.binder.segment.select.orderby.OrderByItem;
+import 
org.apache.shardingsphere.infra.binder.statement.ddl.FetchStatementContext;
+import 
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
+import 
org.apache.shardingsphere.infra.merge.result.impl.stream.StreamMergedResult;
+import 
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
+import org.apache.shardingsphere.sharding.merge.dql.orderby.OrderByValue;
+
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Queue;
+
+/**
+ * Stream merged result for fetch.
+ */
+public final class FetchStreamMergedResult extends StreamMergedResult {
+    
+    private final Collection<OrderByItem> orderByItems;
+    
+    private final Queue<OrderByValue> orderByValuesQueue;
+    
+    private boolean isFirstNext;
+    
+    public FetchStreamMergedResult(final List<QueryResult> queryResults, final 
FetchStatementContext fetchStatementContext, final ShardingSphereSchema schema) 
throws SQLException {
+        String cursorName = 
fetchStatementContext.getCursorName().getIdentifier().getValue().toLowerCase();
+        SelectStatementContext selectStatementContext = 
fetchStatementContext.getCursorStatementContext().getSelectStatementContext();
+        orderByItems = selectStatementContext.getOrderByContext().getItems();
+        orderByValuesQueue = 
FetchOrderByValueQueuesHolder.get().computeIfAbsent(cursorName, key -> new 
PriorityQueue<>(queryResults.size()));
+        orderResultSetsToQueue(queryResults, selectStatementContext, schema);
+        isFirstNext = true;
+    }
+    
+    private void orderResultSetsToQueue(final List<QueryResult> queryResults, 
final SelectStatementContext selectStatementContext, final ShardingSphereSchema 
schema) throws SQLException {
+        for (QueryResult each : queryResults) {
+            OrderByValue orderByValue = new OrderByValue(each, orderByItems, 
selectStatementContext, schema);
+            if (orderByValue.next()) {
+                orderByValuesQueue.offer(orderByValue);
+            }
+        }
+    }
+    
+    @Override
+    public boolean next() throws SQLException {
+        if (orderByValuesQueue.isEmpty()) {
+            return false;
+        }
+        // TODO support fetch count and fetch all statement
+        if (isFirstNext) {
+            setCurrentQueryResult(orderByValuesQueue.poll().getQueryResult());
+            isFirstNext = false;
+            return true;
+        } else {
+            return false;
+        }
+    }
+}
diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/dql/ShardingDQLResultMerger.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/dql/ShardingDQLResultMerger.java
index ead58bbbdd3..9a1cff5c4f6 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/dql/ShardingDQLResultMerger.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/dql/ShardingDQLResultMerger.java
@@ -29,9 +29,9 @@ import 
org.apache.shardingsphere.infra.merge.engine.merger.ResultMerger;
 import org.apache.shardingsphere.infra.merge.result.MergedResult;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
+import 
org.apache.shardingsphere.sharding.merge.common.IteratorStreamMergedResult;
 import 
org.apache.shardingsphere.sharding.merge.dql.groupby.GroupByMemoryMergedResult;
 import 
org.apache.shardingsphere.sharding.merge.dql.groupby.GroupByStreamMergedResult;
-import 
org.apache.shardingsphere.sharding.merge.dql.iterator.IteratorStreamMergedResult;
 import 
org.apache.shardingsphere.sharding.merge.dql.orderby.OrderByStreamMergedResult;
 import 
org.apache.shardingsphere.sharding.merge.dql.pagination.LimitDecoratorMergedResult;
 import 
org.apache.shardingsphere.sharding.merge.dql.pagination.RowNumberDecoratorMergedResult;
diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ShardingResultMergerEngineTest.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ShardingResultMergerEngineTest.java
index f18e6007979..1df307b84b6 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ShardingResultMergerEngineTest.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ShardingResultMergerEngineTest.java
@@ -27,6 +27,7 @@ import 
org.apache.shardingsphere.infra.merge.engine.merger.impl.TransparentResul
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
 import org.apache.shardingsphere.sharding.merge.dal.ShardingDALResultMerger;
+import org.apache.shardingsphere.sharding.merge.ddl.ShardingDDLResultMerger;
 import org.apache.shardingsphere.sharding.merge.dql.ShardingDQLResultMerger;
 import 
org.apache.shardingsphere.sql.parser.sql.common.segment.dml.column.ColumnSegment;
 import 
org.apache.shardingsphere.sql.parser.sql.common.segment.dml.column.InsertColumnsSegment;
@@ -38,6 +39,7 @@ import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectState
 import 
org.apache.shardingsphere.sql.parser.sql.common.value.identifier.IdentifierValue;
 import 
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLInsertStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLSelectStatement;
+import 
org.apache.shardingsphere.sql.parser.sql.dialect.statement.opengauss.ddl.OpenGaussFetchStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.dialect.statement.oracle.dml.OracleSelectStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.dialect.statement.postgresql.dal.PostgreSQLShowStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.dialect.statement.postgresql.dml.PostgreSQLSelectStatement;
@@ -117,4 +119,12 @@ public final class ShardingResultMergerEngineTest {
         
when(database.getSchemas().get(DefaultDatabase.LOGIC_NAME)).thenReturn(mock(ShardingSphereSchema.class));
         return new 
InsertStatementContext(Collections.singletonMap(DefaultDatabase.LOGIC_NAME, 
database), Collections.emptyList(), insertStatement, 
DefaultDatabase.LOGIC_NAME);
     }
+    
+    @Test
+    public void assertNewInstanceWithDDLStatement() {
+        ConfigurationProperties props = new ConfigurationProperties(new 
Properties());
+        CommonSQLStatementContext<OpenGaussFetchStatement> sqlStatementContext 
= new CommonSQLStatementContext<>(new OpenGaussFetchStatement());
+        assertThat(new 
ShardingResultMergerEngine().newInstance(DefaultDatabase.LOGIC_NAME, 
DatabaseTypeFactory.getInstance("MySQL"), null, props,
+                sqlStatementContext), 
instanceOf(ShardingDDLResultMerger.class));
+    }
 }
diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/dql/iterator/IteratorStreamMergedResultTest.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/common/IteratorStreamMergedResultTest.java
similarity index 99%
rename from 
shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/dql/iterator/IteratorStreamMergedResultTest.java
rename to 
shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/common/IteratorStreamMergedResultTest.java
index 9010a755d1b..622c26340c0 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/dql/iterator/IteratorStreamMergedResultTest.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/common/IteratorStreamMergedResultTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.sharding.merge.dql.iterator;
+package org.apache.shardingsphere.sharding.merge.common;
 
 import 
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
 import org.apache.shardingsphere.infra.database.DefaultDatabase;
diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ddl/ShardingDDLResultMergerTest.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ddl/ShardingDDLResultMergerTest.java
new file mode 100644
index 00000000000..e591183cb0c
--- /dev/null
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ddl/ShardingDDLResultMergerTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.sharding.merge.ddl;
+
+import 
org.apache.shardingsphere.infra.binder.statement.ddl.CursorStatementContext;
+import 
org.apache.shardingsphere.infra.binder.statement.ddl.FetchStatementContext;
+import 
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
+import org.apache.shardingsphere.infra.database.DefaultDatabase;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
+import 
org.apache.shardingsphere.infra.merge.result.impl.transparent.TransparentMergedResult;
+import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import 
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
+import 
org.apache.shardingsphere.sharding.merge.common.IteratorStreamMergedResult;
+import 
org.apache.shardingsphere.sharding.merge.ddl.fetch.FetchStreamMergedResult;
+import 
org.apache.shardingsphere.sql.parser.sql.common.segment.ddl.cursor.CursorNameSegment;
+import 
org.apache.shardingsphere.sql.parser.sql.common.segment.dml.item.ProjectionsSegment;
+import 
org.apache.shardingsphere.sql.parser.sql.common.segment.generic.table.SimpleTableSegment;
+import 
org.apache.shardingsphere.sql.parser.sql.common.segment.generic.table.TableNameSegment;
+import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
+import 
org.apache.shardingsphere.sql.parser.sql.common.value.identifier.IdentifierValue;
+import 
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLSelectStatement;
+import 
org.apache.shardingsphere.sql.parser.sql.dialect.statement.opengauss.ddl.OpenGaussFetchStatement;
+import org.junit.Test;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public final class ShardingDDLResultMergerTest {
+    
+    @Test
+    public void assertBuildIteratorStreamMergedResult() throws SQLException {
+        ShardingDDLResultMerger merger = new ShardingDDLResultMerger();
+        assertThat(merger.merge(createSingleQueryResults(), 
mock(FetchStatementContext.class), mock(ShardingSphereDatabase.class)), 
instanceOf(IteratorStreamMergedResult.class));
+    }
+    
+    @Test
+    public void assertBuildFetchStreamMergedResult() throws SQLException {
+        ShardingDDLResultMerger merger = new ShardingDDLResultMerger();
+        ShardingSphereDatabase database = mock(ShardingSphereDatabase.class, 
RETURNS_DEEP_STUBS);
+        
when(database.getSchemas().get(DefaultDatabase.LOGIC_NAME)).thenReturn(mock(ShardingSphereSchema.class));
+        assertThat(merger.merge(createMultiQueryResults(), 
createFetchStatementContext(database), mock(ShardingSphereDatabase.class)), 
instanceOf(FetchStreamMergedResult.class));
+    }
+    
+    @Test
+    public void assertBuildTransparentMergedResult() throws SQLException {
+        ShardingDDLResultMerger merger = new ShardingDDLResultMerger();
+        assertThat(merger.merge(createMultiQueryResults(), 
mock(SelectStatementContext.class), mock(ShardingSphereDatabase.class)), 
instanceOf(TransparentMergedResult.class));
+    }
+    
+    private FetchStatementContext createFetchStatementContext(final 
ShardingSphereDatabase database) {
+        OpenGaussFetchStatement fetchStatement = createFetchStatement();
+        FetchStatementContext result = new 
FetchStatementContext(fetchStatement);
+        CursorStatementContext cursorStatementContext = 
createCursorStatementContext(database);
+        result.setUpCursorDefinition(cursorStatementContext);
+        return result;
+    }
+    
+    private CursorStatementContext createCursorStatementContext(final 
ShardingSphereDatabase database) {
+        CursorStatementContext result = mock(CursorStatementContext.class, 
RETURNS_DEEP_STUBS);
+        SelectStatement selectStatement = createSelectStatement();
+        selectStatement.setProjections(new ProjectionsSegment(0, 0));
+        SelectStatementContext selectStatementContext = new 
SelectStatementContext(Collections.singletonMap(DefaultDatabase.LOGIC_NAME, 
database), Collections.emptyList(),
+                selectStatement, DefaultDatabase.LOGIC_NAME);
+        
when(result.getSelectStatementContext()).thenReturn(selectStatementContext);
+        when(result.getSqlStatement().getSelect()).thenReturn(selectStatement);
+        return result;
+    }
+    
+    private List<QueryResult> createSingleQueryResults() throws SQLException {
+        List<QueryResult> result = new LinkedList<>();
+        QueryResult queryResult = createQueryResult();
+        result.add(queryResult);
+        return result;
+    }
+    
+    private List<QueryResult> createMultiQueryResults() throws SQLException {
+        List<QueryResult> result = new LinkedList<>();
+        QueryResult queryResult = createQueryResult();
+        result.add(queryResult);
+        result.add(mock(QueryResult.class, RETURNS_DEEP_STUBS));
+        result.add(mock(QueryResult.class, RETURNS_DEEP_STUBS));
+        result.add(mock(QueryResult.class, RETURNS_DEEP_STUBS));
+        return result;
+    }
+    
+    private QueryResult createQueryResult() throws SQLException {
+        QueryResult result = mock(QueryResult.class, RETURNS_DEEP_STUBS);
+        when(result.getMetaData().getColumnCount()).thenReturn(1);
+        when(result.getMetaData().getColumnLabel(1)).thenReturn("count(*)");
+        when(result.getValue(1, Object.class)).thenReturn(0);
+        return result;
+    }
+    
+    private OpenGaussFetchStatement createFetchStatement() {
+        OpenGaussFetchStatement result = new OpenGaussFetchStatement();
+        result.setCursorName(new CursorNameSegment(0, 0, new 
IdentifierValue("t_order_cursor")));
+        return result;
+    }
+    
+    private SelectStatement createSelectStatement() {
+        SelectStatement result = new MySQLSelectStatement();
+        result.setFrom(new SimpleTableSegment(new TableNameSegment(10, 13, new 
IdentifierValue("tbl"))));
+        result.setProjections(new ProjectionsSegment(0, 0));
+        return result;
+    }
+}
diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueQueuesHolderTest.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueQueuesHolderTest.java
new file mode 100644
index 00000000000..b31d48f009e
--- /dev/null
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueQueuesHolderTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.sharding.merge.ddl.fetch;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.PriorityQueue;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public final class FetchOrderByValueQueuesHolderTest {
+    
+    @Before
+    public void setUp() {
+        FetchOrderByValueQueuesHolder.remove();
+    }
+    
+    @Test
+    public void assertTrafficContextHolder() {
+        
assertFalse(FetchOrderByValueQueuesHolder.get().containsKey("t_order_cursor"));
+        FetchOrderByValueQueuesHolder.get().computeIfAbsent("t_order_cursor", 
key -> new PriorityQueue<>());
+        
assertTrue(FetchOrderByValueQueuesHolder.get().containsKey("t_order_cursor"));
+        FetchOrderByValueQueuesHolder.remove();
+        
assertFalse(FetchOrderByValueQueuesHolder.get().containsKey("t_order_cursor"));
+    }
+}
diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/dql/ShardingDQLResultMergerTest.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/dql/ShardingDQLResultMergerTest.java
index 223a9915b0e..18f3b03b180 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/dql/ShardingDQLResultMergerTest.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/dql/ShardingDQLResultMergerTest.java
@@ -31,7 +31,7 @@ import 
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.
 import 
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
 import 
org.apache.shardingsphere.sharding.merge.dql.groupby.GroupByMemoryMergedResult;
 import 
org.apache.shardingsphere.sharding.merge.dql.groupby.GroupByStreamMergedResult;
-import 
org.apache.shardingsphere.sharding.merge.dql.iterator.IteratorStreamMergedResult;
+import 
org.apache.shardingsphere.sharding.merge.common.IteratorStreamMergedResult;
 import 
org.apache.shardingsphere.sharding.merge.dql.orderby.OrderByStreamMergedResult;
 import 
org.apache.shardingsphere.sharding.merge.dql.pagination.LimitDecoratorMergedResult;
 import 
org.apache.shardingsphere.sharding.merge.dql.pagination.RowNumberDecoratorMergedResult;
diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/sharding/distsql/handler/query/ShardingTableRulesUsedAlgorithmQueryResultSet.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/sharding/distsql/handler/query/ShardingTableRulesUsedAlgorithmQueryResultSet.java
index cb3214e0a3e..721f7ff7f04 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/sharding/distsql/handler/query/ShardingTableRulesUsedAlgorithmQueryResultSet.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/sharding/distsql/handler/query/ShardingTableRulesUsedAlgorithmQueryResultSet.java
@@ -58,7 +58,7 @@ public final class 
ShardingTableRulesUsedAlgorithmQueryResultSet implements Dist
             if (((null == each.getDatabaseShardingStrategy() && 
matchDefaultDatabaseShardingStrategy)
                     || (null != each.getDatabaseShardingStrategy() && 
algorithmName.equals(each.getDatabaseShardingStrategy().getShardingAlgorithmName())))
                     || ((null == each.getTableShardingStrategy() && 
matchDefaultTableShardingStrategy)
-                    || (null != each.getTableShardingStrategy() && 
algorithmName.equals(each.getTableShardingStrategy().getShardingAlgorithmName()))))
 {
+                            || (null != each.getTableShardingStrategy() && 
algorithmName.equals(each.getTableShardingStrategy().getShardingAlgorithmName()))))
 {
                 data.add(Arrays.asList("table", each.getLogicTable()));
             }
         });
diff --git 
a/shardingsphere-infra/shardingsphere-infra-binder/src/main/java/org/apache/shardingsphere/infra/binder/SQLStatementContextFactory.java
 
b/shardingsphere-infra/shardingsphere-infra-binder/src/main/java/org/apache/shardingsphere/infra/binder/SQLStatementContextFactory.java
index 3e23d1bc804..a0ee1111de0 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-binder/src/main/java/org/apache/shardingsphere/infra/binder/SQLStatementContextFactory.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-binder/src/main/java/org/apache/shardingsphere/infra/binder/SQLStatementContextFactory.java
@@ -141,7 +141,7 @@ public final class SQLStatementContextFactory {
             return getDMLStatementContext(databases, parameters, 
(DMLStatement) sqlStatement, defaultDatabaseName);
         }
         if (sqlStatement instanceof DDLStatement) {
-            return getDDLStatementContext((DDLStatement) sqlStatement);
+            return getDDLStatementContext(databases, parameters, 
(DDLStatement) sqlStatement, defaultDatabaseName);
         }
         if (sqlStatement instanceof DCLStatement) {
             return getDCLStatementContext((DCLStatement) sqlStatement);
@@ -175,7 +175,8 @@ public final class SQLStatementContextFactory {
         throw new UnsupportedOperationException(String.format("Unsupported SQL 
statement `%s`", sqlStatement.getClass().getSimpleName()));
     }
     
-    private static SQLStatementContext<?> getDDLStatementContext(final 
DDLStatement sqlStatement) {
+    private static SQLStatementContext<?> getDDLStatementContext(final 
Map<String, ShardingSphereDatabase> databases, final List<Object> parameters,
+                                                                 final 
DDLStatement sqlStatement, final String defaultDatabaseName) {
         if (sqlStatement instanceof CreateSchemaStatement) {
             return new CreateSchemaStatementContext((CreateSchemaStatement) 
sqlStatement);
         }
@@ -225,7 +226,7 @@ public final class SQLStatementContextFactory {
             return new CommentStatementContext((CommentStatement) 
sqlStatement);
         }
         if (sqlStatement instanceof OpenGaussCursorStatement) {
-            return new CursorStatementContext((OpenGaussCursorStatement) 
sqlStatement);
+            return new CursorStatementContext(databases, parameters, 
(OpenGaussCursorStatement) sqlStatement, defaultDatabaseName);
         }
         if (sqlStatement instanceof CloseStatement) {
             return new CloseStatementContext((CloseStatement) sqlStatement);
diff --git 
a/shardingsphere-infra/shardingsphere-infra-binder/src/main/java/org/apache/shardingsphere/infra/binder/statement/ddl/CursorStatementContext.java
 
b/shardingsphere-infra/shardingsphere-infra-binder/src/main/java/org/apache/shardingsphere/infra/binder/statement/ddl/CursorStatementContext.java
index 46991d1aa2f..58ee20772bc 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-binder/src/main/java/org/apache/shardingsphere/infra/binder/statement/ddl/CursorStatementContext.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-binder/src/main/java/org/apache/shardingsphere/infra/binder/statement/ddl/CursorStatementContext.java
@@ -20,9 +20,11 @@ package org.apache.shardingsphere.infra.binder.statement.ddl;
 import lombok.Getter;
 import org.apache.shardingsphere.infra.binder.segment.table.TablesContext;
 import 
org.apache.shardingsphere.infra.binder.statement.CommonSQLStatementContext;
+import 
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
 import org.apache.shardingsphere.infra.binder.type.CursorAvailable;
 import org.apache.shardingsphere.infra.binder.type.TableAvailable;
 import org.apache.shardingsphere.infra.binder.type.WhereAvailable;
+import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.sql.parser.sql.common.extractor.TableExtractor;
 import 
org.apache.shardingsphere.sql.parser.sql.common.segment.ddl.cursor.CursorNameSegment;
 import 
org.apache.shardingsphere.sql.parser.sql.common.segment.dml.column.ColumnSegment;
@@ -35,6 +37,8 @@ import 
org.apache.shardingsphere.sql.parser.sql.dialect.statement.opengauss.ddl.
 
 import java.util.Collection;
 import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 
 /**
  * Cursor statement context.
@@ -48,11 +52,15 @@ public final class CursorStatementContext extends 
CommonSQLStatementContext<Open
     
     private final TablesContext tablesContext;
     
-    public CursorStatementContext(final OpenGaussCursorStatement sqlStatement) 
{
+    private final SelectStatementContext selectStatementContext;
+    
+    public CursorStatementContext(final Map<String, ShardingSphereDatabase> 
databases, final List<Object> parameters,
+                                  final OpenGaussCursorStatement sqlStatement, 
final String defaultDatabaseName) {
         super(sqlStatement);
         tablesContext = new TablesContext(getSimpleTableSegments(), 
getDatabaseType());
         extractWhereSegments(whereSegments, sqlStatement.getSelect());
         ColumnExtractor.extractColumnSegments(columnSegments, whereSegments);
+        selectStatementContext = new SelectStatementContext(databases, 
parameters, sqlStatement.getSelect(), defaultDatabaseName);
     }
     
     private Collection<SimpleTableSegment> getSimpleTableSegments() {
diff --git 
a/shardingsphere-infra/shardingsphere-infra-binder/src/test/java/org/apache/shardingsphere/infra/binder/statement/SQLStatementContextFactoryTest.java
 
b/shardingsphere-infra/shardingsphere-infra-binder/src/test/java/org/apache/shardingsphere/infra/binder/statement/SQLStatementContextFactoryTest.java
index a72855a50f6..87a63f8c403 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-binder/src/test/java/org/apache/shardingsphere/infra/binder/statement/SQLStatementContextFactoryTest.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-binder/src/test/java/org/apache/shardingsphere/infra/binder/statement/SQLStatementContextFactoryTest.java
@@ -117,9 +117,11 @@ public final class SQLStatementContextFactoryTest {
     
     @Test
     public void assertNewInstanceForCursorStatement() {
-        OpenGaussCursorStatement sqlStatement = 
mock(OpenGaussCursorStatement.class);
-        
when(sqlStatement.getSelect()).thenReturn(mock(MySQLSelectStatement.class));
-        SQLStatementContext<?> actual = 
SQLStatementContextFactory.newInstance(mockDatabases(), 
Collections.emptyList(), sqlStatement, DefaultDatabase.LOGIC_NAME);
+        OpenGaussCursorStatement cursorStatement = 
mock(OpenGaussCursorStatement.class, RETURNS_DEEP_STUBS);
+        MySQLSelectStatement selectStatement = 
mock(MySQLSelectStatement.class, RETURNS_DEEP_STUBS);
+        
when(selectStatement.getProjections().isDistinctRow()).thenReturn(false);
+        when(cursorStatement.getSelect()).thenReturn(selectStatement);
+        SQLStatementContext<?> actual = 
SQLStatementContextFactory.newInstance(mockDatabases(), 
Collections.emptyList(), cursorStatement, DefaultDatabase.LOGIC_NAME);
         assertThat(actual, instanceOf(CursorStatementContext.class));
     }
     
diff --git 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
index 92dbacf1ffe..7c4c464d89b 100644
--- 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
+++ 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
@@ -24,6 +24,7 @@ import 
org.apache.shardingsphere.driver.jdbc.core.datasource.metadata.ShardingSp
 import 
org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSpherePreparedStatement;
 import 
org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSphereStatement;
 import org.apache.shardingsphere.mode.manager.ContextManager;
+import 
org.apache.shardingsphere.sharding.merge.ddl.fetch.FetchOrderByValueQueuesHolder;
 import org.apache.shardingsphere.traffic.context.TrafficContextHolder;
 import org.apache.shardingsphere.transaction.TransactionHolder;
 
@@ -170,6 +171,7 @@ public final class ShardingSphereConnection extends 
AbstractConnectionAdapter {
             
connectionManager.getConnectionTransaction().setRollbackOnly(false);
             TransactionHolder.clear();
             TrafficContextHolder.remove();
+            FetchOrderByValueQueuesHolder.remove();
         }
     }
     
@@ -181,6 +183,7 @@ public final class ShardingSphereConnection extends 
AbstractConnectionAdapter {
             
connectionManager.getConnectionTransaction().setRollbackOnly(false);
             TransactionHolder.clear();
             TrafficContextHolder.remove();
+            FetchOrderByValueQueuesHolder.remove();
         }
     }
     
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
index 7843987729e..0d343638d4e 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
@@ -53,7 +53,7 @@ import 
org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
 import 
org.apache.shardingsphere.proxy.backend.response.header.query.QueryHeaderBuilderEngine;
 import 
org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
 import 
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
-import 
org.apache.shardingsphere.sharding.merge.dql.iterator.IteratorStreamMergedResult;
+import 
org.apache.shardingsphere.sharding.merge.common.IteratorStreamMergedResult;
 import 
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLInsertStatement;
 
 import java.sql.Connection;
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/JDBCBackendTransactionManager.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/JDBCBackendTransactionManager.java
index ff5f6d612e0..c2cc3a92291 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/JDBCBackendTransactionManager.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/JDBCBackendTransactionManager.java
@@ -20,6 +20,7 @@ package 
org.apache.shardingsphere.proxy.backend.communication.jdbc.transaction;
 import 
org.apache.shardingsphere.proxy.backend.communication.TransactionManager;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+import 
org.apache.shardingsphere.sharding.merge.ddl.fetch.FetchOrderByValueQueuesHolder;
 import org.apache.shardingsphere.transaction.ConnectionSavepointManager;
 import 
org.apache.shardingsphere.transaction.ShardingSphereTransactionManagerEngine;
 import org.apache.shardingsphere.transaction.TransactionHolder;
@@ -81,6 +82,7 @@ public final class JDBCBackendTransactionManager implements 
TransactionManager<V
                 
connection.getConnectionSession().getTransactionStatus().setInTransaction(false);
                 
connection.getConnectionSession().getTransactionStatus().setRollbackOnly(false);
                 TransactionHolder.clear();
+                FetchOrderByValueQueuesHolder.remove();
             }
         }
         return null;
@@ -99,6 +101,7 @@ public final class JDBCBackendTransactionManager implements 
TransactionManager<V
                 
connection.getConnectionSession().getTransactionStatus().setInTransaction(false);
                 
connection.getConnectionSession().getTransactionStatus().setRollbackOnly(false);
                 TransactionHolder.clear();
+                FetchOrderByValueQueuesHolder.remove();
             }
         }
         return null;
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/opengauss/OpenGaussSelectDatabaseExecutor.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/opengauss/OpenGaussSelectDatabaseExecutor.java
index 73f4e57822f..dd02cde3d09 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/opengauss/OpenGaussSelectDatabaseExecutor.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/opengauss/OpenGaussSelectDatabaseExecutor.java
@@ -29,7 +29,7 @@ import 
org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
 import 
org.apache.shardingsphere.proxy.backend.text.admin.executor.DatabaseAdminQueryExecutor;
 import 
org.apache.shardingsphere.proxy.backend.text.admin.opengauss.schema.OgCatalog;
 import 
org.apache.shardingsphere.proxy.backend.text.admin.opengauss.schema.OgDatabase;
-import 
org.apache.shardingsphere.sharding.merge.dql.iterator.IteratorStreamMergedResult;
+import 
org.apache.shardingsphere.sharding.merge.common.IteratorStreamMergedResult;
 import org.apache.shardingsphere.sql.parser.sql.common.util.SQLUtil;
 
 import java.sql.DriverManager;
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionXAHandler.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionXAHandler.java
index a93bdfa1611..dea51959490 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionXAHandler.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionXAHandler.java
@@ -23,6 +23,7 @@ import 
org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
 import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
 import 
org.apache.shardingsphere.proxy.backend.text.data.impl.SchemaAssignedDatabaseBackendHandler;
+import 
org.apache.shardingsphere.sharding.merge.ddl.fetch.FetchOrderByValueQueuesHolder;
 import 
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.TCLStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.XAStatement;
 import org.apache.shardingsphere.transaction.TransactionHolder;
@@ -87,6 +88,7 @@ public final class TransactionXAHandler implements 
TextProtocolBackendHandler {
                 } finally {
                     
connectionSession.getTransactionStatus().setManualXA(false);
                     TransactionHolder.clear();
+                    FetchOrderByValueQueuesHolder.remove();
                 }
             default:
                 throw new SQLException("unrecognized XA statement " + 
tclStatement.getOp());
diff --git 
a/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/jaxb/cases/domain/SQLParserTestCases.java
 
b/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/jaxb/cases/domain/SQLParserTestCases.java
index 0cc2e974376..188ed43f4d8 100644
--- 
a/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/jaxb/cases/domain/SQLParserTestCases.java
+++ 
b/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/jaxb/cases/domain/SQLParserTestCases.java
@@ -1404,16 +1404,16 @@ public final class SQLParserTestCases {
     
     @XmlElement(name = "drop-rollback-segment")
     private final List<DropRollbackSegmentStatementTestCase> 
dropRollbackSegmentTestCases = new LinkedList<>();
-
+    
     @XmlElement(name = "create-lockdown-profile")
     private final List<CreateLockdownProfileStatementTestCase> 
createLockdownProfileTestCases = new LinkedList<>();
-
+    
     @XmlElement(name = "drop-lockdown-profile")
     private final List<DropLockdownProfileStatementTestCase> 
dropLockdownProfileTestCases = new LinkedList<>();
-
+    
     @XmlElement(name = "create-inmemory-join-group")
     private final List<CreateInmemoryJoinGroupStatementTestCase> 
createInmemoryJoinGroupTestCases = new LinkedList<>();
-
+    
     @XmlElement(name = "cursor")
     private final List<CursorStatementTestCase> cursorTestCases = new 
LinkedList<>();
     

Reply via email to