This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new bcde6f374c4 Fix fetch forward all error in openGauss(#21421) (#21471)
bcde6f374c4 is described below

commit bcde6f374c4a3a025173fbc9f6d0e66ed686a042
Author: ZhangCheng <[email protected]>
AuthorDate: Tue Oct 11 13:40:15 2022 +0800

    Fix fetch forward all error in openGauss(#21421) (#21471)
    
    * Fix fetch forward all error in openGauss(#21421)
    
    * Fix fetch all
    
    * Fix
---
 .../merge/ddl/fetch/FetchStreamMergedResult.java      | 19 +++++++++++++++++++
 .../infra/context/cursor/CursorConnectionContext.java |  4 ++++
 2 files changed, 23 insertions(+)

diff --git 
a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
 
b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
index 4c9db958ace..2edf9fe5a62 100644
--- 
a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
+++ 
b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
@@ -55,6 +55,8 @@ public final class FetchStreamMergedResult extends 
StreamMergedResult {
     
     private boolean isFirstNext;
     
+    private boolean isExecutedAllDirection;
+    
     public FetchStreamMergedResult(final List<QueryResult> queryResults, final 
FetchStatementContext fetchStatementContext,
                                    final ShardingSphereSchema schema, final 
ConnectionContext connectionContext) throws SQLException {
         orderByValuesQueue = new PriorityQueue<>(queryResults.size());
@@ -65,11 +67,15 @@ public final class FetchStreamMergedResult extends 
StreamMergedResult {
         List<FetchOrderByValueGroup> fetchOrderByValueGroups = 
getFetchOrderByValueGroups(queryResults, selectStatementContext, schema, 
cursorName, connectionContext);
         addOrderedResultSetsToQueue(fetchOrderByValueGroups, queryResults);
         setMinResultSetRowCount(cursorName, connectionContext);
+        handleExecutedAllDirections(connectionContext, cursorName);
         isFirstNext = true;
     }
     
     @Override
     public boolean next() throws SQLException {
+        if (isExecutedAllDirection) {
+            return false;
+        }
         if (orderByValuesQueue.isEmpty()) {
             return false;
         }
@@ -101,6 +107,10 @@ public final class FetchStreamMergedResult extends 
StreamMergedResult {
         if (actualFetchCount <= 0 && 
!DirectionType.isAllDirectionType(directionType)) {
             return result;
         }
+        if 
(connectionContext.getCursorConnectionContext().getExecutedAllDirections().containsKey(cursorName))
 {
+            result.forEach(each -> each.getOrderByValues().clear());
+            return result;
+        }
         Collection<OrderByItem> items = 
selectStatementContext.getOrderByContext().getItems();
         int index = 0;
         for (QueryResult each : queryResults) {
@@ -153,6 +163,15 @@ public final class FetchStreamMergedResult extends 
StreamMergedResult {
         
connectionContext.getCursorConnectionContext().getMinGroupRowCounts().put(cursorName,
 Math.max(minResultSetRowCount, 0L));
     }
     
+    private void handleExecutedAllDirections(final ConnectionContext 
connectionContext, final String cursorName) {
+        if 
(connectionContext.getCursorConnectionContext().getExecutedAllDirections().containsKey(cursorName))
 {
+            isExecutedAllDirection = true;
+        }
+        if (DirectionType.isAllDirectionType(directionType)) {
+            
connectionContext.getCursorConnectionContext().getExecutedAllDirections().put(cursorName,
 true);
+        }
+    }
+    
     private long getGroupRowCount(final FetchOrderByValueGroup 
fetchOrderByValueGroup) {
         long result = 0;
         for (OrderByValue each : fetchOrderByValueGroup.getOrderByValues()) {
diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/context/cursor/CursorConnectionContext.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/context/cursor/CursorConnectionContext.java
index d4204937008..e3a0870d46f 100644
--- 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/context/cursor/CursorConnectionContext.java
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/context/cursor/CursorConnectionContext.java
@@ -35,11 +35,14 @@ public final class CursorConnectionContext implements 
AutoCloseable {
     
     private final Map<String, CursorDefinition> cursorDefinitions = new 
ConcurrentHashMap<>();
     
+    private final Map<String, Boolean> executedAllDirections = new 
ConcurrentHashMap<>();
+    
     @Override
     public void close() {
         orderByValueGroups.clear();
         minGroupRowCounts.clear();
         cursorDefinitions.clear();
+        executedAllDirections.clear();
     }
     
     /**
@@ -51,5 +54,6 @@ public final class CursorConnectionContext implements 
AutoCloseable {
         orderByValueGroups.remove(name);
         minGroupRowCounts.remove(name);
         cursorDefinitions.remove(name);
+        executedAllDirections.remove(name);
     }
 }

Reply via email to