This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new bcde6f374c4 Fix fetch forward all error in openGauss(#21421) (#21471)
bcde6f374c4 is described below
commit bcde6f374c4a3a025173fbc9f6d0e66ed686a042
Author: ZhangCheng <[email protected]>
AuthorDate: Tue Oct 11 13:40:15 2022 +0800
Fix fetch forward all error in openGauss(#21421) (#21471)
* Fix fetch forward all error in openGauss(#21421)
* Fix fetch all
* Fix
---
.../merge/ddl/fetch/FetchStreamMergedResult.java | 19 +++++++++++++++++++
.../infra/context/cursor/CursorConnectionContext.java | 4 ++++
2 files changed, 23 insertions(+)
diff --git
a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
index 4c9db958ace..2edf9fe5a62 100644
---
a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
+++
b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
@@ -55,6 +55,8 @@ public final class FetchStreamMergedResult extends
StreamMergedResult {
private boolean isFirstNext;
+ private boolean isExecutedAllDirection;
+
public FetchStreamMergedResult(final List<QueryResult> queryResults, final
FetchStatementContext fetchStatementContext,
final ShardingSphereSchema schema, final
ConnectionContext connectionContext) throws SQLException {
orderByValuesQueue = new PriorityQueue<>(queryResults.size());
@@ -65,11 +67,15 @@ public final class FetchStreamMergedResult extends
StreamMergedResult {
List<FetchOrderByValueGroup> fetchOrderByValueGroups =
getFetchOrderByValueGroups(queryResults, selectStatementContext, schema,
cursorName, connectionContext);
addOrderedResultSetsToQueue(fetchOrderByValueGroups, queryResults);
setMinResultSetRowCount(cursorName, connectionContext);
+ handleExecutedAllDirections(connectionContext, cursorName);
isFirstNext = true;
}
@Override
public boolean next() throws SQLException {
+ if (isExecutedAllDirection) {
+ return false;
+ }
if (orderByValuesQueue.isEmpty()) {
return false;
}
@@ -101,6 +107,10 @@ public final class FetchStreamMergedResult extends
StreamMergedResult {
if (actualFetchCount <= 0 &&
!DirectionType.isAllDirectionType(directionType)) {
return result;
}
+ if
(connectionContext.getCursorConnectionContext().getExecutedAllDirections().containsKey(cursorName))
{
+ result.forEach(each -> each.getOrderByValues().clear());
+ return result;
+ }
Collection<OrderByItem> items =
selectStatementContext.getOrderByContext().getItems();
int index = 0;
for (QueryResult each : queryResults) {
@@ -153,6 +163,15 @@ public final class FetchStreamMergedResult extends
StreamMergedResult {
connectionContext.getCursorConnectionContext().getMinGroupRowCounts().put(cursorName,
Math.max(minResultSetRowCount, 0L));
}
+ private void handleExecutedAllDirections(final ConnectionContext
connectionContext, final String cursorName) {
+ if
(connectionContext.getCursorConnectionContext().getExecutedAllDirections().containsKey(cursorName))
{
+ isExecutedAllDirection = true;
+ }
+ if (DirectionType.isAllDirectionType(directionType)) {
+
connectionContext.getCursorConnectionContext().getExecutedAllDirections().put(cursorName,
true);
+ }
+ }
+
private long getGroupRowCount(final FetchOrderByValueGroup
fetchOrderByValueGroup) {
long result = 0;
for (OrderByValue each : fetchOrderByValueGroup.getOrderByValues()) {
diff --git
a/infra/common/src/main/java/org/apache/shardingsphere/infra/context/cursor/CursorConnectionContext.java
b/infra/common/src/main/java/org/apache/shardingsphere/infra/context/cursor/CursorConnectionContext.java
index d4204937008..e3a0870d46f 100644
---
a/infra/common/src/main/java/org/apache/shardingsphere/infra/context/cursor/CursorConnectionContext.java
+++
b/infra/common/src/main/java/org/apache/shardingsphere/infra/context/cursor/CursorConnectionContext.java
@@ -35,11 +35,14 @@ public final class CursorConnectionContext implements
AutoCloseable {
private final Map<String, CursorDefinition> cursorDefinitions = new
ConcurrentHashMap<>();
+ private final Map<String, Boolean> executedAllDirections = new
ConcurrentHashMap<>();
+
@Override
public void close() {
orderByValueGroups.clear();
minGroupRowCounts.clear();
cursorDefinitions.clear();
+ executedAllDirections.clear();
}
/**
@@ -51,5 +54,6 @@ public final class CursorConnectionContext implements
AutoCloseable {
orderByValueGroups.remove(name);
minGroupRowCounts.remove(name);
cursorDefinitions.remove(name);
+ executedAllDirections.remove(name);
}
}