This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 6264f03d4d0 Support openGauss fetch cursor next statement (#18180)
6264f03d4d0 is described below
commit 6264f03d4d0c3c7840e9e1aec6a5304a5cf340c8
Author: Zhengqiang Duan <[email protected]>
AuthorDate: Sun Jun 5 17:32:16 2022 +0800
Support openGauss fetch cursor next statement (#18180)
* Support openGauss fetch cursor next statement
* fix unit test
* fix unit test
* add final modifier
* optimize code
* fix checkstyle
* fix unit test
---
.../sharding/merge/ShardingResultMergerEngine.java | 17 ++-
.../IteratorStreamMergedResult.java | 2 +-
.../merge/dal/show/ShowIndexMergedResult.java | 6 +-
.../merge/ddl/ShardingDDLResultMerger.java | 71 ++++++++++++
.../ddl/fetch/FetchOrderByValueQueuesHolder.java | 51 ++++++++
.../merge/ddl/fetch/FetchStreamMergedResult.java | 77 ++++++++++++
.../merge/dql/ShardingDQLResultMerger.java | 2 +-
.../merge/ShardingResultMergerEngineTest.java | 10 ++
.../IteratorStreamMergedResultTest.java | 2 +-
.../merge/ddl/ShardingDDLResultMergerTest.java | 129 +++++++++++++++++++++
.../fetch/FetchOrderByValueQueuesHolderTest.java | 43 +++++++
.../merge/dql/ShardingDQLResultMergerTest.java | 2 +-
...rdingTableRulesUsedAlgorithmQueryResultSet.java | 2 +-
.../infra/binder/SQLStatementContextFactory.java | 7 +-
.../statement/ddl/CursorStatementContext.java | 10 +-
.../statement/SQLStatementContextFactoryTest.java | 8 +-
.../core/connection/ShardingSphereConnection.java | 3 +
.../jdbc/JDBCDatabaseCommunicationEngine.java | 2 +-
.../transaction/JDBCBackendTransactionManager.java | 3 +
.../opengauss/OpenGaussSelectDatabaseExecutor.java | 2 +-
.../text/transaction/TransactionXAHandler.java | 2 +
.../jaxb/cases/domain/SQLParserTestCases.java | 8 +-
22 files changed, 432 insertions(+), 27 deletions(-)
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ShardingResultMergerEngine.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ShardingResultMergerEngine.java
index a988b47c7fd..4a311feabe0 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ShardingResultMergerEngine.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ShardingResultMergerEngine.java
@@ -17,18 +17,20 @@
package org.apache.shardingsphere.sharding.merge;
-import org.apache.shardingsphere.sharding.constant.ShardingOrder;
-import org.apache.shardingsphere.sharding.rule.ShardingRule;
-import org.apache.shardingsphere.sharding.merge.dal.ShardingDALResultMerger;
-import org.apache.shardingsphere.sharding.merge.dql.ShardingDQLResultMerger;
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
-import
org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.merge.engine.merger.ResultMerger;
import org.apache.shardingsphere.infra.merge.engine.merger.ResultMergerEngine;
import
org.apache.shardingsphere.infra.merge.engine.merger.impl.TransparentResultMerger;
+import org.apache.shardingsphere.sharding.constant.ShardingOrder;
+import org.apache.shardingsphere.sharding.merge.dal.ShardingDALResultMerger;
+import org.apache.shardingsphere.sharding.merge.ddl.ShardingDDLResultMerger;
+import org.apache.shardingsphere.sharding.merge.dql.ShardingDQLResultMerger;
+import org.apache.shardingsphere.sharding.rule.ShardingRule;
+import
org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
+import
org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.DDLStatement;
/**
* Result merger engine for sharding.
@@ -41,6 +43,9 @@ public final class ShardingResultMergerEngine implements
ResultMergerEngine<Shar
if (sqlStatementContext instanceof SelectStatementContext) {
return new ShardingDQLResultMerger(databaseType);
}
+ if (sqlStatementContext.getSqlStatement() instanceof DDLStatement) {
+ return new ShardingDDLResultMerger();
+ }
if (sqlStatementContext.getSqlStatement() instanceof DALStatement) {
return new ShardingDALResultMerger(databaseName, shardingRule);
}
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/dql/iterator/IteratorStreamMergedResult.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/common/IteratorStreamMergedResult.java
similarity index 96%
rename from
shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/dql/iterator/IteratorStreamMergedResult.java
rename to
shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/common/IteratorStreamMergedResult.java
index 5207717c179..ee227a2498a 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/dql/iterator/IteratorStreamMergedResult.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/common/IteratorStreamMergedResult.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.sharding.merge.dql.iterator;
+package org.apache.shardingsphere.sharding.merge.common;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import
org.apache.shardingsphere.infra.merge.result.impl.stream.StreamMergedResult;
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/dal/show/ShowIndexMergedResult.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/dal/show/ShowIndexMergedResult.java
index 2f5b562c8ce..15d819c6421 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/dal/show/ShowIndexMergedResult.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/dal/show/ShowIndexMergedResult.java
@@ -34,7 +34,7 @@ import java.util.Optional;
/**
* Merged result for show index.
*/
-public class ShowIndexMergedResult extends MemoryMergedResult<ShardingRule> {
+public final class ShowIndexMergedResult extends
MemoryMergedResult<ShardingRule> {
public ShowIndexMergedResult(final ShardingRule shardingRule,
final SQLStatementContext<?>
sqlStatementContext, final ShardingSphereSchema schema, final List<QueryResult>
queryResults) throws SQLException {
@@ -42,8 +42,8 @@ public class ShowIndexMergedResult extends
MemoryMergedResult<ShardingRule> {
}
@Override
- protected final List<MemoryQueryResultRow> init(final ShardingRule
shardingRule, final ShardingSphereSchema schema,
- final
SQLStatementContext<?> sqlStatementContext, final List<QueryResult>
queryResults) throws SQLException {
+ protected List<MemoryQueryResultRow> init(final ShardingRule shardingRule,
final ShardingSphereSchema schema,
+ final SQLStatementContext<?>
sqlStatementContext, final List<QueryResult> queryResults) throws SQLException {
List<MemoryQueryResultRow> result = new LinkedList<>();
for (QueryResult each : queryResults) {
while (each.next()) {
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/ShardingDDLResultMerger.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/ShardingDDLResultMerger.java
new file mode 100644
index 00000000000..a929d8f6087
--- /dev/null
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/ShardingDDLResultMerger.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.sharding.merge.ddl;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import
org.apache.shardingsphere.infra.binder.statement.ddl.FetchStatementContext;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
+import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
+import org.apache.shardingsphere.infra.merge.engine.merger.ResultMerger;
+import org.apache.shardingsphere.infra.merge.result.MergedResult;
+import
org.apache.shardingsphere.infra.merge.result.impl.transparent.TransparentMergedResult;
+import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
+import
org.apache.shardingsphere.sharding.merge.common.IteratorStreamMergedResult;
+import
org.apache.shardingsphere.sharding.merge.ddl.fetch.FetchStreamMergedResult;
+import org.apache.shardingsphere.sql.parser.sql.common.util.SQLUtil;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * DDL result merger for Sharding.
+ */
+@RequiredArgsConstructor
+public final class ShardingDDLResultMerger implements ResultMerger {
+
+ @Override
+ public MergedResult merge(final List<QueryResult> queryResults, final
SQLStatementContext<?> sqlStatementContext, final ShardingSphereDatabase
database) throws SQLException {
+ if (!(sqlStatementContext instanceof FetchStatementContext)) {
+ return new TransparentMergedResult(queryResults.get(0));
+ }
+ if (1 == queryResults.size()) {
+ return new IteratorStreamMergedResult(queryResults);
+ }
+ Map<String, Integer> columnLabelIndexMap =
getColumnLabelIndexMap(queryResults.get(0));
+ FetchStatementContext statementContext = (FetchStatementContext)
sqlStatementContext;
+
statementContext.getCursorStatementContext().getSelectStatementContext().setIndexes(columnLabelIndexMap);
+ return new FetchStreamMergedResult(queryResults, statementContext,
getSchema(sqlStatementContext, database));
+ }
+
+ private ShardingSphereSchema getSchema(final SQLStatementContext<?>
sqlStatementContext, final ShardingSphereDatabase database) {
+ String defaultSchemaName =
DatabaseTypeEngine.getDefaultSchemaName(sqlStatementContext.getDatabaseType(),
database.getName());
+ return
sqlStatementContext.getTablesContext().getSchemaName().map(optional ->
database.getSchemas().get(optional)).orElseGet(() ->
database.getSchemas().get(defaultSchemaName));
+ }
+
+ private Map<String, Integer> getColumnLabelIndexMap(final QueryResult
queryResult) throws SQLException {
+ Map<String, Integer> result = new
TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+ for (int i = 0; i < queryResult.getMetaData().getColumnCount(); i++) {
+
result.put(SQLUtil.getExactlyValue(queryResult.getMetaData().getColumnLabel(i +
1)), i + 1);
+ }
+ return result;
+ }
+}
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueQueuesHolder.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueQueuesHolder.java
new file mode 100644
index 00000000000..f06927506cf
--- /dev/null
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueQueuesHolder.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.sharding.merge.ddl.fetch;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.sharding.merge.dql.orderby.OrderByValue;
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Hold fetch order by value queues for current thread.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class FetchOrderByValueQueuesHolder {
+
+ private static final ThreadLocal<Map<String, Queue<OrderByValue>>>
ORDER_BY_VALUE_QUEUES = ThreadLocal.withInitial(ConcurrentHashMap::new);
+
+ /**
+ * Get fetch order by value queues.
+ *
+ * @return fetch order by value queues
+ */
+ public static Map<String, Queue<OrderByValue>> get() {
+ return ORDER_BY_VALUE_QUEUES.get();
+ }
+
+ /**
+ * Remove fetch order by value queues.
+ */
+ public static void remove() {
+ ORDER_BY_VALUE_QUEUES.remove();
+ }
+}
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
new file mode 100644
index 00000000000..7c6b7e99298
--- /dev/null
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.sharding.merge.ddl.fetch;
+
+import
org.apache.shardingsphere.infra.binder.segment.select.orderby.OrderByItem;
+import
org.apache.shardingsphere.infra.binder.statement.ddl.FetchStatementContext;
+import
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
+import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
+import
org.apache.shardingsphere.infra.merge.result.impl.stream.StreamMergedResult;
+import
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
+import org.apache.shardingsphere.sharding.merge.dql.orderby.OrderByValue;
+
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Queue;
+
+/**
+ * Stream merged result for fetch.
+ */
+public final class FetchStreamMergedResult extends StreamMergedResult {
+
+ private final Collection<OrderByItem> orderByItems;
+
+ private final Queue<OrderByValue> orderByValuesQueue;
+
+ private boolean isFirstNext;
+
+ public FetchStreamMergedResult(final List<QueryResult> queryResults, final
FetchStatementContext fetchStatementContext, final ShardingSphereSchema schema)
throws SQLException {
+ String cursorName =
fetchStatementContext.getCursorName().getIdentifier().getValue().toLowerCase();
+ SelectStatementContext selectStatementContext =
fetchStatementContext.getCursorStatementContext().getSelectStatementContext();
+ orderByItems = selectStatementContext.getOrderByContext().getItems();
+ orderByValuesQueue =
FetchOrderByValueQueuesHolder.get().computeIfAbsent(cursorName, key -> new
PriorityQueue<>(queryResults.size()));
+ orderResultSetsToQueue(queryResults, selectStatementContext, schema);
+ isFirstNext = true;
+ }
+
+ private void orderResultSetsToQueue(final List<QueryResult> queryResults,
final SelectStatementContext selectStatementContext, final ShardingSphereSchema
schema) throws SQLException {
+ for (QueryResult each : queryResults) {
+ OrderByValue orderByValue = new OrderByValue(each, orderByItems,
selectStatementContext, schema);
+ if (orderByValue.next()) {
+ orderByValuesQueue.offer(orderByValue);
+ }
+ }
+ }
+
+ @Override
+ public boolean next() throws SQLException {
+ if (orderByValuesQueue.isEmpty()) {
+ return false;
+ }
+ // TODO support fetch count and fetch all statement
+ if (isFirstNext) {
+ setCurrentQueryResult(orderByValuesQueue.poll().getQueryResult());
+ isFirstNext = false;
+ return true;
+ } else {
+ return false;
+ }
+ }
+}
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/dql/ShardingDQLResultMerger.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/dql/ShardingDQLResultMerger.java
index ead58bbbdd3..9a1cff5c4f6 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/dql/ShardingDQLResultMerger.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/dql/ShardingDQLResultMerger.java
@@ -29,9 +29,9 @@ import
org.apache.shardingsphere.infra.merge.engine.merger.ResultMerger;
import org.apache.shardingsphere.infra.merge.result.MergedResult;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
+import
org.apache.shardingsphere.sharding.merge.common.IteratorStreamMergedResult;
import
org.apache.shardingsphere.sharding.merge.dql.groupby.GroupByMemoryMergedResult;
import
org.apache.shardingsphere.sharding.merge.dql.groupby.GroupByStreamMergedResult;
-import
org.apache.shardingsphere.sharding.merge.dql.iterator.IteratorStreamMergedResult;
import
org.apache.shardingsphere.sharding.merge.dql.orderby.OrderByStreamMergedResult;
import
org.apache.shardingsphere.sharding.merge.dql.pagination.LimitDecoratorMergedResult;
import
org.apache.shardingsphere.sharding.merge.dql.pagination.RowNumberDecoratorMergedResult;
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ShardingResultMergerEngineTest.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ShardingResultMergerEngineTest.java
index f18e6007979..1df307b84b6 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ShardingResultMergerEngineTest.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ShardingResultMergerEngineTest.java
@@ -27,6 +27,7 @@ import
org.apache.shardingsphere.infra.merge.engine.merger.impl.TransparentResul
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
import org.apache.shardingsphere.sharding.merge.dal.ShardingDALResultMerger;
+import org.apache.shardingsphere.sharding.merge.ddl.ShardingDDLResultMerger;
import org.apache.shardingsphere.sharding.merge.dql.ShardingDQLResultMerger;
import
org.apache.shardingsphere.sql.parser.sql.common.segment.dml.column.ColumnSegment;
import
org.apache.shardingsphere.sql.parser.sql.common.segment.dml.column.InsertColumnsSegment;
@@ -38,6 +39,7 @@ import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectState
import
org.apache.shardingsphere.sql.parser.sql.common.value.identifier.IdentifierValue;
import
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLInsertStatement;
import
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLSelectStatement;
+import
org.apache.shardingsphere.sql.parser.sql.dialect.statement.opengauss.ddl.OpenGaussFetchStatement;
import
org.apache.shardingsphere.sql.parser.sql.dialect.statement.oracle.dml.OracleSelectStatement;
import
org.apache.shardingsphere.sql.parser.sql.dialect.statement.postgresql.dal.PostgreSQLShowStatement;
import
org.apache.shardingsphere.sql.parser.sql.dialect.statement.postgresql.dml.PostgreSQLSelectStatement;
@@ -117,4 +119,12 @@ public final class ShardingResultMergerEngineTest {
when(database.getSchemas().get(DefaultDatabase.LOGIC_NAME)).thenReturn(mock(ShardingSphereSchema.class));
return new
InsertStatementContext(Collections.singletonMap(DefaultDatabase.LOGIC_NAME,
database), Collections.emptyList(), insertStatement,
DefaultDatabase.LOGIC_NAME);
}
+
+ @Test
+ public void assertNewInstanceWithDDLStatement() {
+ ConfigurationProperties props = new ConfigurationProperties(new
Properties());
+ CommonSQLStatementContext<OpenGaussFetchStatement> sqlStatementContext
= new CommonSQLStatementContext<>(new OpenGaussFetchStatement());
+ assertThat(new
ShardingResultMergerEngine().newInstance(DefaultDatabase.LOGIC_NAME,
DatabaseTypeFactory.getInstance("MySQL"), null, props,
+ sqlStatementContext),
instanceOf(ShardingDDLResultMerger.class));
+ }
}
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/dql/iterator/IteratorStreamMergedResultTest.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/common/IteratorStreamMergedResultTest.java
similarity index 99%
rename from
shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/dql/iterator/IteratorStreamMergedResultTest.java
rename to
shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/common/IteratorStreamMergedResultTest.java
index 9010a755d1b..622c26340c0 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/dql/iterator/IteratorStreamMergedResultTest.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/common/IteratorStreamMergedResultTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.sharding.merge.dql.iterator;
+package org.apache.shardingsphere.sharding.merge.common;
import
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
import org.apache.shardingsphere.infra.database.DefaultDatabase;
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ddl/ShardingDDLResultMergerTest.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ddl/ShardingDDLResultMergerTest.java
new file mode 100644
index 00000000000..e591183cb0c
--- /dev/null
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ddl/ShardingDDLResultMergerTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.sharding.merge.ddl;
+
+import
org.apache.shardingsphere.infra.binder.statement.ddl.CursorStatementContext;
+import
org.apache.shardingsphere.infra.binder.statement.ddl.FetchStatementContext;
+import
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
+import org.apache.shardingsphere.infra.database.DefaultDatabase;
+import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
+import
org.apache.shardingsphere.infra.merge.result.impl.transparent.TransparentMergedResult;
+import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
+import
org.apache.shardingsphere.sharding.merge.common.IteratorStreamMergedResult;
+import
org.apache.shardingsphere.sharding.merge.ddl.fetch.FetchStreamMergedResult;
+import
org.apache.shardingsphere.sql.parser.sql.common.segment.ddl.cursor.CursorNameSegment;
+import
org.apache.shardingsphere.sql.parser.sql.common.segment.dml.item.ProjectionsSegment;
+import
org.apache.shardingsphere.sql.parser.sql.common.segment.generic.table.SimpleTableSegment;
+import
org.apache.shardingsphere.sql.parser.sql.common.segment.generic.table.TableNameSegment;
+import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
+import
org.apache.shardingsphere.sql.parser.sql.common.value.identifier.IdentifierValue;
+import
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLSelectStatement;
+import
org.apache.shardingsphere.sql.parser.sql.dialect.statement.opengauss.ddl.OpenGaussFetchStatement;
+import org.junit.Test;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public final class ShardingDDLResultMergerTest {
+
+ @Test
+ public void assertBuildIteratorStreamMergedResult() throws SQLException {
+ ShardingDDLResultMerger merger = new ShardingDDLResultMerger();
+ assertThat(merger.merge(createSingleQueryResults(),
mock(FetchStatementContext.class), mock(ShardingSphereDatabase.class)),
instanceOf(IteratorStreamMergedResult.class));
+ }
+
+ @Test
+ public void assertBuildFetchStreamMergedResult() throws SQLException {
+ ShardingDDLResultMerger merger = new ShardingDDLResultMerger();
+ ShardingSphereDatabase database = mock(ShardingSphereDatabase.class,
RETURNS_DEEP_STUBS);
+
when(database.getSchemas().get(DefaultDatabase.LOGIC_NAME)).thenReturn(mock(ShardingSphereSchema.class));
+ assertThat(merger.merge(createMultiQueryResults(),
createFetchStatementContext(database), mock(ShardingSphereDatabase.class)),
instanceOf(FetchStreamMergedResult.class));
+ }
+
+ @Test
+ public void assertBuildTransparentMergedResult() throws SQLException {
+ ShardingDDLResultMerger merger = new ShardingDDLResultMerger();
+ assertThat(merger.merge(createMultiQueryResults(),
mock(SelectStatementContext.class), mock(ShardingSphereDatabase.class)),
instanceOf(TransparentMergedResult.class));
+ }
+
+ private FetchStatementContext createFetchStatementContext(final
ShardingSphereDatabase database) {
+ OpenGaussFetchStatement fetchStatement = createFetchStatement();
+ FetchStatementContext result = new
FetchStatementContext(fetchStatement);
+ CursorStatementContext cursorStatementContext =
createCursorStatementContext(database);
+ result.setUpCursorDefinition(cursorStatementContext);
+ return result;
+ }
+
+ private CursorStatementContext createCursorStatementContext(final
ShardingSphereDatabase database) {
+ CursorStatementContext result = mock(CursorStatementContext.class,
RETURNS_DEEP_STUBS);
+ SelectStatement selectStatement = createSelectStatement();
+ selectStatement.setProjections(new ProjectionsSegment(0, 0));
+ SelectStatementContext selectStatementContext = new
SelectStatementContext(Collections.singletonMap(DefaultDatabase.LOGIC_NAME,
database), Collections.emptyList(),
+ selectStatement, DefaultDatabase.LOGIC_NAME);
+
when(result.getSelectStatementContext()).thenReturn(selectStatementContext);
+ when(result.getSqlStatement().getSelect()).thenReturn(selectStatement);
+ return result;
+ }
+
+ private List<QueryResult> createSingleQueryResults() throws SQLException {
+ List<QueryResult> result = new LinkedList<>();
+ QueryResult queryResult = createQueryResult();
+ result.add(queryResult);
+ return result;
+ }
+
+ private List<QueryResult> createMultiQueryResults() throws SQLException {
+ List<QueryResult> result = new LinkedList<>();
+ QueryResult queryResult = createQueryResult();
+ result.add(queryResult);
+ result.add(mock(QueryResult.class, RETURNS_DEEP_STUBS));
+ result.add(mock(QueryResult.class, RETURNS_DEEP_STUBS));
+ result.add(mock(QueryResult.class, RETURNS_DEEP_STUBS));
+ return result;
+ }
+
+ private QueryResult createQueryResult() throws SQLException {
+ QueryResult result = mock(QueryResult.class, RETURNS_DEEP_STUBS);
+ when(result.getMetaData().getColumnCount()).thenReturn(1);
+ when(result.getMetaData().getColumnLabel(1)).thenReturn("count(*)");
+ when(result.getValue(1, Object.class)).thenReturn(0);
+ return result;
+ }
+
+ private OpenGaussFetchStatement createFetchStatement() {
+ OpenGaussFetchStatement result = new OpenGaussFetchStatement();
+ result.setCursorName(new CursorNameSegment(0, 0, new
IdentifierValue("t_order_cursor")));
+ return result;
+ }
+
+ private SelectStatement createSelectStatement() {
+ SelectStatement result = new MySQLSelectStatement();
+ result.setFrom(new SimpleTableSegment(new TableNameSegment(10, 13, new
IdentifierValue("tbl"))));
+ result.setProjections(new ProjectionsSegment(0, 0));
+ return result;
+ }
+}
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueQueuesHolderTest.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueQueuesHolderTest.java
new file mode 100644
index 00000000000..b31d48f009e
--- /dev/null
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueQueuesHolderTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.sharding.merge.ddl.fetch;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.PriorityQueue;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public final class FetchOrderByValueQueuesHolderTest {
+
+ @Before
+ public void setUp() {
+ FetchOrderByValueQueuesHolder.remove();
+ }
+
+ @Test
+ public void assertTrafficContextHolder() {
+
assertFalse(FetchOrderByValueQueuesHolder.get().containsKey("t_order_cursor"));
+ FetchOrderByValueQueuesHolder.get().computeIfAbsent("t_order_cursor",
key -> new PriorityQueue<>());
+
assertTrue(FetchOrderByValueQueuesHolder.get().containsKey("t_order_cursor"));
+ FetchOrderByValueQueuesHolder.remove();
+
assertFalse(FetchOrderByValueQueuesHolder.get().containsKey("t_order_cursor"));
+ }
+}
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/dql/ShardingDQLResultMergerTest.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/dql/ShardingDQLResultMergerTest.java
index 223a9915b0e..18f3b03b180 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/dql/ShardingDQLResultMergerTest.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/dql/ShardingDQLResultMergerTest.java
@@ -31,7 +31,7 @@ import
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.
import
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
import
org.apache.shardingsphere.sharding.merge.dql.groupby.GroupByMemoryMergedResult;
import
org.apache.shardingsphere.sharding.merge.dql.groupby.GroupByStreamMergedResult;
-import
org.apache.shardingsphere.sharding.merge.dql.iterator.IteratorStreamMergedResult;
+import
org.apache.shardingsphere.sharding.merge.common.IteratorStreamMergedResult;
import
org.apache.shardingsphere.sharding.merge.dql.orderby.OrderByStreamMergedResult;
import
org.apache.shardingsphere.sharding.merge.dql.pagination.LimitDecoratorMergedResult;
import
org.apache.shardingsphere.sharding.merge.dql.pagination.RowNumberDecoratorMergedResult;
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/sharding/distsql/handler/query/ShardingTableRulesUsedAlgorithmQueryResultSet.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/sharding/distsql/handler/query/ShardingTableRulesUsedAlgorithmQueryResultSet.java
index cb3214e0a3e..721f7ff7f04 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/sharding/distsql/handler/query/ShardingTableRulesUsedAlgorithmQueryResultSet.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/sharding/distsql/handler/query/ShardingTableRulesUsedAlgorithmQueryResultSet.java
@@ -58,7 +58,7 @@ public final class
ShardingTableRulesUsedAlgorithmQueryResultSet implements Dist
if (((null == each.getDatabaseShardingStrategy() &&
matchDefaultDatabaseShardingStrategy)
|| (null != each.getDatabaseShardingStrategy() &&
algorithmName.equals(each.getDatabaseShardingStrategy().getShardingAlgorithmName())))
|| ((null == each.getTableShardingStrategy() &&
matchDefaultTableShardingStrategy)
- || (null != each.getTableShardingStrategy() &&
algorithmName.equals(each.getTableShardingStrategy().getShardingAlgorithmName()))))
{
+ || (null != each.getTableShardingStrategy() &&
algorithmName.equals(each.getTableShardingStrategy().getShardingAlgorithmName()))))
{
data.add(Arrays.asList("table", each.getLogicTable()));
}
});
diff --git
a/shardingsphere-infra/shardingsphere-infra-binder/src/main/java/org/apache/shardingsphere/infra/binder/SQLStatementContextFactory.java
b/shardingsphere-infra/shardingsphere-infra-binder/src/main/java/org/apache/shardingsphere/infra/binder/SQLStatementContextFactory.java
index 3e23d1bc804..a0ee1111de0 100644
---
a/shardingsphere-infra/shardingsphere-infra-binder/src/main/java/org/apache/shardingsphere/infra/binder/SQLStatementContextFactory.java
+++
b/shardingsphere-infra/shardingsphere-infra-binder/src/main/java/org/apache/shardingsphere/infra/binder/SQLStatementContextFactory.java
@@ -141,7 +141,7 @@ public final class SQLStatementContextFactory {
return getDMLStatementContext(databases, parameters,
(DMLStatement) sqlStatement, defaultDatabaseName);
}
if (sqlStatement instanceof DDLStatement) {
- return getDDLStatementContext((DDLStatement) sqlStatement);
+ return getDDLStatementContext(databases, parameters,
(DDLStatement) sqlStatement, defaultDatabaseName);
}
if (sqlStatement instanceof DCLStatement) {
return getDCLStatementContext((DCLStatement) sqlStatement);
@@ -175,7 +175,8 @@ public final class SQLStatementContextFactory {
throw new UnsupportedOperationException(String.format("Unsupported SQL
statement `%s`", sqlStatement.getClass().getSimpleName()));
}
- private static SQLStatementContext<?> getDDLStatementContext(final
DDLStatement sqlStatement) {
+ private static SQLStatementContext<?> getDDLStatementContext(final
Map<String, ShardingSphereDatabase> databases, final List<Object> parameters,
+ final
DDLStatement sqlStatement, final String defaultDatabaseName) {
if (sqlStatement instanceof CreateSchemaStatement) {
return new CreateSchemaStatementContext((CreateSchemaStatement)
sqlStatement);
}
@@ -225,7 +226,7 @@ public final class SQLStatementContextFactory {
return new CommentStatementContext((CommentStatement)
sqlStatement);
}
if (sqlStatement instanceof OpenGaussCursorStatement) {
- return new CursorStatementContext((OpenGaussCursorStatement)
sqlStatement);
+ return new CursorStatementContext(databases, parameters,
(OpenGaussCursorStatement) sqlStatement, defaultDatabaseName);
}
if (sqlStatement instanceof CloseStatement) {
return new CloseStatementContext((CloseStatement) sqlStatement);
diff --git
a/shardingsphere-infra/shardingsphere-infra-binder/src/main/java/org/apache/shardingsphere/infra/binder/statement/ddl/CursorStatementContext.java
b/shardingsphere-infra/shardingsphere-infra-binder/src/main/java/org/apache/shardingsphere/infra/binder/statement/ddl/CursorStatementContext.java
index 46991d1aa2f..58ee20772bc 100644
---
a/shardingsphere-infra/shardingsphere-infra-binder/src/main/java/org/apache/shardingsphere/infra/binder/statement/ddl/CursorStatementContext.java
+++
b/shardingsphere-infra/shardingsphere-infra-binder/src/main/java/org/apache/shardingsphere/infra/binder/statement/ddl/CursorStatementContext.java
@@ -20,9 +20,11 @@ package org.apache.shardingsphere.infra.binder.statement.ddl;
import lombok.Getter;
import org.apache.shardingsphere.infra.binder.segment.table.TablesContext;
import
org.apache.shardingsphere.infra.binder.statement.CommonSQLStatementContext;
+import
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
import org.apache.shardingsphere.infra.binder.type.CursorAvailable;
import org.apache.shardingsphere.infra.binder.type.TableAvailable;
import org.apache.shardingsphere.infra.binder.type.WhereAvailable;
+import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import
org.apache.shardingsphere.sql.parser.sql.common.extractor.TableExtractor;
import
org.apache.shardingsphere.sql.parser.sql.common.segment.ddl.cursor.CursorNameSegment;
import
org.apache.shardingsphere.sql.parser.sql.common.segment.dml.column.ColumnSegment;
@@ -35,6 +37,8 @@ import
org.apache.shardingsphere.sql.parser.sql.dialect.statement.opengauss.ddl.
import java.util.Collection;
import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
/**
* Cursor statement context.
@@ -48,11 +52,15 @@ public final class CursorStatementContext extends
CommonSQLStatementContext<Open
private final TablesContext tablesContext;
- public CursorStatementContext(final OpenGaussCursorStatement sqlStatement)
{
+ private final SelectStatementContext selectStatementContext;
+
+ public CursorStatementContext(final Map<String, ShardingSphereDatabase>
databases, final List<Object> parameters,
+ final OpenGaussCursorStatement sqlStatement,
final String defaultDatabaseName) {
super(sqlStatement);
tablesContext = new TablesContext(getSimpleTableSegments(),
getDatabaseType());
extractWhereSegments(whereSegments, sqlStatement.getSelect());
ColumnExtractor.extractColumnSegments(columnSegments, whereSegments);
+ selectStatementContext = new SelectStatementContext(databases,
parameters, sqlStatement.getSelect(), defaultDatabaseName);
}
private Collection<SimpleTableSegment> getSimpleTableSegments() {
diff --git
a/shardingsphere-infra/shardingsphere-infra-binder/src/test/java/org/apache/shardingsphere/infra/binder/statement/SQLStatementContextFactoryTest.java
b/shardingsphere-infra/shardingsphere-infra-binder/src/test/java/org/apache/shardingsphere/infra/binder/statement/SQLStatementContextFactoryTest.java
index a72855a50f6..87a63f8c403 100644
---
a/shardingsphere-infra/shardingsphere-infra-binder/src/test/java/org/apache/shardingsphere/infra/binder/statement/SQLStatementContextFactoryTest.java
+++
b/shardingsphere-infra/shardingsphere-infra-binder/src/test/java/org/apache/shardingsphere/infra/binder/statement/SQLStatementContextFactoryTest.java
@@ -117,9 +117,11 @@ public final class SQLStatementContextFactoryTest {
@Test
public void assertNewInstanceForCursorStatement() {
- OpenGaussCursorStatement sqlStatement =
mock(OpenGaussCursorStatement.class);
-
when(sqlStatement.getSelect()).thenReturn(mock(MySQLSelectStatement.class));
- SQLStatementContext<?> actual =
SQLStatementContextFactory.newInstance(mockDatabases(),
Collections.emptyList(), sqlStatement, DefaultDatabase.LOGIC_NAME);
+ OpenGaussCursorStatement cursorStatement =
mock(OpenGaussCursorStatement.class, RETURNS_DEEP_STUBS);
+ MySQLSelectStatement selectStatement =
mock(MySQLSelectStatement.class, RETURNS_DEEP_STUBS);
+
when(selectStatement.getProjections().isDistinctRow()).thenReturn(false);
+ when(cursorStatement.getSelect()).thenReturn(selectStatement);
+ SQLStatementContext<?> actual =
SQLStatementContextFactory.newInstance(mockDatabases(),
Collections.emptyList(), cursorStatement, DefaultDatabase.LOGIC_NAME);
assertThat(actual, instanceOf(CursorStatementContext.class));
}
diff --git
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
index 92dbacf1ffe..7c4c464d89b 100644
---
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
+++
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
@@ -24,6 +24,7 @@ import
org.apache.shardingsphere.driver.jdbc.core.datasource.metadata.ShardingSp
import
org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSpherePreparedStatement;
import
org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSphereStatement;
import org.apache.shardingsphere.mode.manager.ContextManager;
+import
org.apache.shardingsphere.sharding.merge.ddl.fetch.FetchOrderByValueQueuesHolder;
import org.apache.shardingsphere.traffic.context.TrafficContextHolder;
import org.apache.shardingsphere.transaction.TransactionHolder;
@@ -170,6 +171,7 @@ public final class ShardingSphereConnection extends
AbstractConnectionAdapter {
connectionManager.getConnectionTransaction().setRollbackOnly(false);
TransactionHolder.clear();
TrafficContextHolder.remove();
+ FetchOrderByValueQueuesHolder.remove();
}
}
@@ -181,6 +183,7 @@ public final class ShardingSphereConnection extends
AbstractConnectionAdapter {
connectionManager.getConnectionTransaction().setRollbackOnly(false);
TransactionHolder.clear();
TrafficContextHolder.remove();
+ FetchOrderByValueQueuesHolder.remove();
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
index 7843987729e..0d343638d4e 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
@@ -53,7 +53,7 @@ import
org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
import
org.apache.shardingsphere.proxy.backend.response.header.query.QueryHeaderBuilderEngine;
import
org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
import
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
-import
org.apache.shardingsphere.sharding.merge.dql.iterator.IteratorStreamMergedResult;
+import
org.apache.shardingsphere.sharding.merge.common.IteratorStreamMergedResult;
import
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLInsertStatement;
import java.sql.Connection;
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/JDBCBackendTransactionManager.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/JDBCBackendTransactionManager.java
index ff5f6d612e0..c2cc3a92291 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/JDBCBackendTransactionManager.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/JDBCBackendTransactionManager.java
@@ -20,6 +20,7 @@ package
org.apache.shardingsphere.proxy.backend.communication.jdbc.transaction;
import
org.apache.shardingsphere.proxy.backend.communication.TransactionManager;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+import
org.apache.shardingsphere.sharding.merge.ddl.fetch.FetchOrderByValueQueuesHolder;
import org.apache.shardingsphere.transaction.ConnectionSavepointManager;
import
org.apache.shardingsphere.transaction.ShardingSphereTransactionManagerEngine;
import org.apache.shardingsphere.transaction.TransactionHolder;
@@ -81,6 +82,7 @@ public final class JDBCBackendTransactionManager implements
TransactionManager<V
connection.getConnectionSession().getTransactionStatus().setInTransaction(false);
connection.getConnectionSession().getTransactionStatus().setRollbackOnly(false);
TransactionHolder.clear();
+ FetchOrderByValueQueuesHolder.remove();
}
}
return null;
@@ -99,6 +101,7 @@ public final class JDBCBackendTransactionManager implements
TransactionManager<V
connection.getConnectionSession().getTransactionStatus().setInTransaction(false);
connection.getConnectionSession().getTransactionStatus().setRollbackOnly(false);
TransactionHolder.clear();
+ FetchOrderByValueQueuesHolder.remove();
}
}
return null;
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/opengauss/OpenGaussSelectDatabaseExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/opengauss/OpenGaussSelectDatabaseExecutor.java
index 73f4e57822f..dd02cde3d09 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/opengauss/OpenGaussSelectDatabaseExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/opengauss/OpenGaussSelectDatabaseExecutor.java
@@ -29,7 +29,7 @@ import
org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import
org.apache.shardingsphere.proxy.backend.text.admin.executor.DatabaseAdminQueryExecutor;
import
org.apache.shardingsphere.proxy.backend.text.admin.opengauss.schema.OgCatalog;
import
org.apache.shardingsphere.proxy.backend.text.admin.opengauss.schema.OgDatabase;
-import
org.apache.shardingsphere.sharding.merge.dql.iterator.IteratorStreamMergedResult;
+import
org.apache.shardingsphere.sharding.merge.common.IteratorStreamMergedResult;
import org.apache.shardingsphere.sql.parser.sql.common.util.SQLUtil;
import java.sql.DriverManager;
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionXAHandler.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionXAHandler.java
index a93bdfa1611..dea51959490 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionXAHandler.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionXAHandler.java
@@ -23,6 +23,7 @@ import
org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
import
org.apache.shardingsphere.proxy.backend.text.data.impl.SchemaAssignedDatabaseBackendHandler;
+import
org.apache.shardingsphere.sharding.merge.ddl.fetch.FetchOrderByValueQueuesHolder;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.TCLStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.XAStatement;
import org.apache.shardingsphere.transaction.TransactionHolder;
@@ -87,6 +88,7 @@ public final class TransactionXAHandler implements
TextProtocolBackendHandler {
} finally {
connectionSession.getTransactionStatus().setManualXA(false);
TransactionHolder.clear();
+ FetchOrderByValueQueuesHolder.remove();
}
default:
throw new SQLException("unrecognized XA statement " +
tclStatement.getOp());
diff --git
a/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/jaxb/cases/domain/SQLParserTestCases.java
b/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/jaxb/cases/domain/SQLParserTestCases.java
index 0cc2e974376..188ed43f4d8 100644
---
a/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/jaxb/cases/domain/SQLParserTestCases.java
+++
b/shardingsphere-test/shardingsphere-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/jaxb/cases/domain/SQLParserTestCases.java
@@ -1404,16 +1404,16 @@ public final class SQLParserTestCases {
@XmlElement(name = "drop-rollback-segment")
private final List<DropRollbackSegmentStatementTestCase>
dropRollbackSegmentTestCases = new LinkedList<>();
-
+
@XmlElement(name = "create-lockdown-profile")
private final List<CreateLockdownProfileStatementTestCase>
createLockdownProfileTestCases = new LinkedList<>();
-
+
@XmlElement(name = "drop-lockdown-profile")
private final List<DropLockdownProfileStatementTestCase>
dropLockdownProfileTestCases = new LinkedList<>();
-
+
@XmlElement(name = "create-inmemory-join-group")
private final List<CreateInmemoryJoinGroupStatementTestCase>
createInmemoryJoinGroupTestCases = new LinkedList<>();
-
+
@XmlElement(name = "cursor")
private final List<CursorStatementTestCase> cursorTestCases = new
LinkedList<>();