This is an automated email from the ASF dual-hosted git repository.

tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 54bbc6cd42b Optimize fetch statement merge logic and clear thread 
local when close cursor (#18215)
54bbc6cd42b is described below

commit 54bbc6cd42b4e4b3e5f745bb152e942c25106795
Author: Zhengqiang Duan <[email protected]>
AuthorDate: Tue Jun 7 13:58:42 2022 +0800

    Optimize fetch statement merge logic and clear thread local when close 
cursor (#18215)
---
 .../merge/ddl/fetch/FetchStreamMergedResult.java   |  17 +-
 .../sharding/merge/dql/orderby/OrderByValue.java   |   2 +-
 .../ddl/fetch/FetchStreamMergedResultTest.java     | 190 +++++++++++++++++++++
 .../impl/SchemaAssignedDatabaseBackendHandler.java |   5 +
 4 files changed, 211 insertions(+), 3 deletions(-)

diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
index a5fb82256a1..c59422552f0 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
@@ -17,9 +17,13 @@
 
 package org.apache.shardingsphere.sharding.merge.ddl.fetch;
 
+import 
org.apache.shardingsphere.infra.binder.segment.select.orderby.OrderByItem;
 import 
org.apache.shardingsphere.infra.binder.statement.ddl.FetchStatementContext;
 import 
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.memory.JDBCMemoryQueryResult;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
 import 
org.apache.shardingsphere.infra.merge.result.impl.stream.StreamMergedResult;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
 import org.apache.shardingsphere.sharding.merge.dql.orderby.OrderByValue;
@@ -27,6 +31,7 @@ import 
org.apache.shardingsphere.sql.parser.sql.common.constant.DirectionType;
 import 
org.apache.shardingsphere.sql.parser.sql.common.segment.ddl.cursor.DirectionSegment;
 
 import java.sql.SQLException;
+import java.util.Collection;
 import java.util.List;
 import java.util.PriorityQueue;
 import java.util.Queue;
@@ -55,9 +60,10 @@ public final class FetchStreamMergedResult extends 
StreamMergedResult {
     }
     
     private void orderResultSetsToQueue(final List<QueryResult> queryResults, 
final SelectStatementContext selectStatementContext, final ShardingSphereSchema 
schema) throws SQLException {
-        
+        Collection<OrderByItem> items = 
selectStatementContext.getOrderByContext().getItems();
         for (QueryResult each : queryResults) {
-            OrderByValue orderByValue = new OrderByValue(each, 
selectStatementContext.getOrderByContext().getItems(), selectStatementContext, 
schema);
+            QueryResult queryResult = decorate(each, 
selectStatementContext.getDatabaseType());
+            OrderByValue orderByValue = new OrderByValue(queryResult, items, 
selectStatementContext, schema);
             if (orderByValue.next()) {
                 orderByValuesQueue.offer(orderByValue);
             }
@@ -65,6 +71,13 @@ public final class FetchStreamMergedResult extends 
StreamMergedResult {
         setCurrentQueryResult(orderByValuesQueue.isEmpty() ? 
queryResults.get(0) : orderByValuesQueue.peek().getQueryResult());
     }
     
+    private QueryResult decorate(final QueryResult queryResult, final 
DatabaseType databaseType) throws SQLException {
+        if (!DirectionType.isAllDirectionType(directionType) && queryResult 
instanceof JDBCStreamQueryResult) {
+            return new JDBCMemoryQueryResult(((JDBCStreamQueryResult) 
queryResult).getResultSet(), databaseType);
+        }
+        return queryResult;
+    }
+    
     @Override
     public boolean next() throws SQLException {
         if (orderByValuesQueue.isEmpty()) {
diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/dql/orderby/OrderByValue.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/dql/orderby/OrderByValue.java
index de71bcd5630..f1c727e273c 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/dql/orderby/OrderByValue.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/dql/orderby/OrderByValue.java
@@ -92,7 +92,7 @@ public final class OrderByValue implements 
Comparable<OrderByValue> {
     }
     
     /**
-     * iterate next data.
+     * Iterate next data.
      *
      * @return has next data
      * @throws SQLException SQL exception
diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResultTest.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResultTest.java
new file mode 100644
index 00000000000..db3b20a5b1a
--- /dev/null
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResultTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.sharding.merge.ddl.fetch;
+
+import 
org.apache.shardingsphere.infra.binder.statement.ddl.CursorStatementContext;
+import 
org.apache.shardingsphere.infra.binder.statement.ddl.FetchStatementContext;
+import org.apache.shardingsphere.infra.database.DefaultDatabase;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
+import org.apache.shardingsphere.infra.merge.result.MergedResult;
+import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.sharding.merge.ddl.ShardingDDLResultMerger;
+import org.apache.shardingsphere.sql.parser.sql.common.constant.DirectionType;
+import 
org.apache.shardingsphere.sql.parser.sql.common.segment.ddl.cursor.CursorNameSegment;
+import 
org.apache.shardingsphere.sql.parser.sql.common.segment.ddl.cursor.DirectionSegment;
+import 
org.apache.shardingsphere.sql.parser.sql.common.segment.dml.item.ProjectionsSegment;
+import 
org.apache.shardingsphere.sql.parser.sql.common.segment.generic.table.SimpleTableSegment;
+import 
org.apache.shardingsphere.sql.parser.sql.common.segment.generic.table.TableNameSegment;
+import 
org.apache.shardingsphere.sql.parser.sql.common.value.identifier.IdentifierValue;
+import 
org.apache.shardingsphere.sql.parser.sql.dialect.statement.opengauss.ddl.OpenGaussCursorStatement;
+import 
org.apache.shardingsphere.sql.parser.sql.dialect.statement.opengauss.ddl.OpenGaussFetchStatement;
+import 
org.apache.shardingsphere.sql.parser.sql.dialect.statement.opengauss.dml.OpenGaussSelectStatement;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public final class FetchStreamMergedResultTest {
+    
+    private FetchStatementContext fetchCountStatementContext;
+    
+    private FetchStatementContext fetchAllStatementContext;
+    
+    private ShardingDDLResultMerger resultMerger;
+    
+    private ShardingSphereDatabase database;
+    
+    @Before
+    public void setUp() {
+        fetchCountStatementContext = new 
FetchStatementContext(createFetchStatement(false));
+        
fetchCountStatementContext.setUpCursorDefinition(createCursorStatementContext());
+        fetchAllStatementContext = new 
FetchStatementContext(createFetchStatement(true));
+        
fetchAllStatementContext.setUpCursorDefinition(createCursorStatementContext());
+        resultMerger = new ShardingDDLResultMerger();
+        database = mock(ShardingSphereDatabase.class, RETURNS_DEEP_STUBS);
+        when(database.getName()).thenReturn(DefaultDatabase.LOGIC_NAME);
+    }
+    
+    private OpenGaussFetchStatement createFetchStatement(final boolean 
containsAllDirectionType) {
+        OpenGaussFetchStatement result = new OpenGaussFetchStatement();
+        result.setCursorName(new CursorNameSegment(0, 0, new 
IdentifierValue("t_order_cursor")));
+        if (containsAllDirectionType) {
+            DirectionSegment direction = new DirectionSegment(0, 0, 
DirectionType.ALL);
+            result.setDirection(direction);
+        }
+        return result;
+    }
+    
+    private CursorStatementContext createCursorStatementContext() {
+        ShardingSphereDatabase database = mock(ShardingSphereDatabase.class, 
RETURNS_DEEP_STUBS);
+        when(database.getName()).thenReturn(DefaultDatabase.LOGIC_NAME);
+        OpenGaussCursorStatement cursorStatement = new 
OpenGaussCursorStatement();
+        cursorStatement.setSelect(createSelectStatement());
+        return new 
CursorStatementContext(Collections.singletonMap(DefaultDatabase.LOGIC_NAME, 
database), Collections.emptyList(), cursorStatement, 
DefaultDatabase.LOGIC_NAME);
+    }
+    
+    private OpenGaussSelectStatement createSelectStatement() {
+        OpenGaussSelectStatement result = new OpenGaussSelectStatement();
+        result.setProjections(new ProjectionsSegment(0, 0));
+        result.setFrom(new SimpleTableSegment(new TableNameSegment(0, 0, new 
IdentifierValue("t_order"))));
+        return result;
+    }
+    
+    @Test
+    public void assertNextForResultSetsAllEmpty() throws SQLException {
+        List<QueryResult> queryResults = Arrays.asList(mock(QueryResult.class, 
RETURNS_DEEP_STUBS), mock(QueryResult.class, RETURNS_DEEP_STUBS), 
mock(QueryResult.class, RETURNS_DEEP_STUBS));
+        MergedResult actual = resultMerger.merge(queryResults, 
fetchCountStatementContext, database);
+        assertFalse(actual.next());
+    }
+    
+    @Test
+    public void assertNextForResultSetsAllEmptyWhenConfigAllDirectionType() 
throws SQLException {
+        List<QueryResult> queryResults = Arrays.asList(mock(QueryResult.class, 
RETURNS_DEEP_STUBS), mock(QueryResult.class, RETURNS_DEEP_STUBS), 
mock(QueryResult.class, RETURNS_DEEP_STUBS));
+        MergedResult actual = resultMerger.merge(queryResults, 
fetchAllStatementContext, database);
+        assertFalse(actual.next());
+    }
+    
+    @Test
+    public void assertNextForResultSetsAllNotEmpty() throws SQLException {
+        List<QueryResult> queryResults = Arrays.asList(mock(QueryResult.class, 
RETURNS_DEEP_STUBS), mock(QueryResult.class, RETURNS_DEEP_STUBS), 
mock(QueryResult.class, RETURNS_DEEP_STUBS));
+        for (QueryResult each : queryResults) {
+            when(each.next()).thenReturn(true, false);
+        }
+        MergedResult actual = resultMerger.merge(queryResults, 
fetchCountStatementContext, database);
+        assertTrue(actual.next());
+        assertFalse(actual.next());
+        assertFalse(actual.next());
+        assertFalse(actual.next());
+    }
+    
+    @Test
+    public void assertNextForResultSetsAllNotEmptyWhenConfigAllDirectionType() 
throws SQLException {
+        List<QueryResult> queryResults = Arrays.asList(mock(QueryResult.class, 
RETURNS_DEEP_STUBS), mock(QueryResult.class, RETURNS_DEEP_STUBS), 
mock(QueryResult.class, RETURNS_DEEP_STUBS));
+        for (QueryResult each : queryResults) {
+            when(each.next()).thenReturn(true, false);
+        }
+        MergedResult actual = resultMerger.merge(queryResults, 
fetchAllStatementContext, database);
+        assertTrue(actual.next());
+        assertTrue(actual.next());
+        assertTrue(actual.next());
+        assertFalse(actual.next());
+    }
+    
+    @Test
+    public void assertNextForFirstResultSetsNotEmptyOnly() throws SQLException 
{
+        List<QueryResult> queryResults = Arrays.asList(mock(QueryResult.class, 
RETURNS_DEEP_STUBS), mock(QueryResult.class, RETURNS_DEEP_STUBS), 
mock(QueryResult.class, RETURNS_DEEP_STUBS));
+        when(queryResults.get(0).next()).thenReturn(true, false);
+        MergedResult actual = resultMerger.merge(queryResults, 
fetchCountStatementContext, database);
+        assertTrue(actual.next());
+        assertFalse(actual.next());
+    }
+    
+    @Test
+    public void 
assertNextForFirstResultSetsNotEmptyOnlyWhenConfigAllDirectionType() throws 
SQLException {
+        List<QueryResult> queryResults = Arrays.asList(mock(QueryResult.class, 
RETURNS_DEEP_STUBS), mock(QueryResult.class, RETURNS_DEEP_STUBS), 
mock(QueryResult.class, RETURNS_DEEP_STUBS));
+        when(queryResults.get(0).next()).thenReturn(true, false);
+        MergedResult actual = resultMerger.merge(queryResults, 
fetchAllStatementContext, database);
+        assertTrue(actual.next());
+        assertFalse(actual.next());
+    }
+    
+    @Test
+    public void assertNextForMiddleResultSetsNotEmpty() throws SQLException {
+        List<QueryResult> queryResults = Arrays.asList(mock(QueryResult.class, 
RETURNS_DEEP_STUBS), mock(QueryResult.class, RETURNS_DEEP_STUBS), 
mock(QueryResult.class, RETURNS_DEEP_STUBS));
+        when(queryResults.get(1).next()).thenReturn(true, false);
+        MergedResult actual = resultMerger.merge(queryResults, 
fetchCountStatementContext, database);
+        assertTrue(actual.next());
+        assertFalse(actual.next());
+    }
+    
+    @Test
+    public void 
assertNextForMiddleResultSetsNotEmptyWhenConfigAllDirectionType() throws 
SQLException {
+        List<QueryResult> queryResults = Arrays.asList(mock(QueryResult.class, 
RETURNS_DEEP_STUBS), mock(QueryResult.class, RETURNS_DEEP_STUBS), 
mock(QueryResult.class, RETURNS_DEEP_STUBS));
+        when(queryResults.get(1).next()).thenReturn(true, false);
+        MergedResult actual = resultMerger.merge(queryResults, 
fetchAllStatementContext, database);
+        assertTrue(actual.next());
+        assertFalse(actual.next());
+    }
+    
+    @Test
+    public void assertNextForLastResultSetsNotEmptyOnly() throws SQLException {
+        List<QueryResult> queryResults = Arrays.asList(mock(QueryResult.class, 
RETURNS_DEEP_STUBS), mock(QueryResult.class, RETURNS_DEEP_STUBS), 
mock(QueryResult.class, RETURNS_DEEP_STUBS));
+        when(queryResults.get(2).next()).thenReturn(true, false);
+        MergedResult actual = resultMerger.merge(queryResults, 
fetchCountStatementContext, database);
+        assertTrue(actual.next());
+        assertFalse(actual.next());
+    }
+    
+    @Test
+    public void 
assertNextForLastResultSetsNotEmptyOnlyWhenConfigAllDirectionType() throws 
SQLException {
+        List<QueryResult> queryResults = Arrays.asList(mock(QueryResult.class, 
RETURNS_DEEP_STUBS), mock(QueryResult.class, RETURNS_DEEP_STUBS), 
mock(QueryResult.class, RETURNS_DEEP_STUBS));
+        when(queryResults.get(2).next()).thenReturn(true, false);
+        MergedResult actual = resultMerger.merge(queryResults, 
fetchAllStatementContext, database);
+        assertTrue(actual.next());
+        assertFalse(actual.next());
+    }
+}
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java
index 9ae610bf44a..e57ee177f3e 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java
@@ -22,6 +22,7 @@ import io.vertx.core.Future;
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.infra.binder.aware.CursorDefinitionAware;
 import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import 
org.apache.shardingsphere.infra.binder.statement.ddl.CloseStatementContext;
 import 
org.apache.shardingsphere.infra.binder.statement.ddl.CursorStatementContext;
 import org.apache.shardingsphere.infra.binder.type.CursorAvailable;
 import 
org.apache.shardingsphere.infra.distsql.exception.resource.RequiredResourceMissedException;
@@ -35,6 +36,7 @@ import 
org.apache.shardingsphere.proxy.backend.exception.RuleNotExistedException
 import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
 import 
org.apache.shardingsphere.proxy.backend.text.data.DatabaseBackendHandler;
+import 
org.apache.shardingsphere.sharding.merge.ddl.fetch.FetchOrderByValueQueuesHolder;
 
 import java.sql.SQLException;
 import java.util.Collection;
@@ -93,6 +95,9 @@ public final class SchemaAssignedDatabaseBackendHandler 
implements DatabaseBacke
         if (statementContext instanceof CursorStatementContext) {
             connectionSession.getCursorDefinitions().put(cursorName, 
(CursorStatementContext) statementContext);
         }
+        if (statementContext instanceof CloseStatementContext) {
+            FetchOrderByValueQueuesHolder.get().remove(cursorName);
+        }
         if (statementContext instanceof CursorDefinitionAware) {
             CursorStatementContext cursorStatementContext = 
connectionSession.getCursorDefinitions().get(cursorName);
             Preconditions.checkArgument(null != cursorStatementContext, 
"Cursor %s does not exist.", cursorName);

Reply via email to