This is an automated email from the ASF dual-hosted git repository.
tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 54bbc6cd42b Optimize fetch statement merge logic and clear thread
local when close cursor (#18215)
54bbc6cd42b is described below
commit 54bbc6cd42b4e4b3e5f745bb152e942c25106795
Author: Zhengqiang Duan <[email protected]>
AuthorDate: Tue Jun 7 13:58:42 2022 +0800
Optimize fetch statement merge logic and clear thread local when close
cursor (#18215)
---
.../merge/ddl/fetch/FetchStreamMergedResult.java | 17 +-
.../sharding/merge/dql/orderby/OrderByValue.java | 2 +-
.../ddl/fetch/FetchStreamMergedResultTest.java | 190 +++++++++++++++++++++
.../impl/SchemaAssignedDatabaseBackendHandler.java | 5 +
4 files changed, 211 insertions(+), 3 deletions(-)
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
index a5fb82256a1..c59422552f0 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
@@ -17,9 +17,13 @@
package org.apache.shardingsphere.sharding.merge.ddl.fetch;
+import
org.apache.shardingsphere.infra.binder.segment.select.orderby.OrderByItem;
import
org.apache.shardingsphere.infra.binder.statement.ddl.FetchStatementContext;
import
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
+import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.memory.JDBCMemoryQueryResult;
+import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
import
org.apache.shardingsphere.infra.merge.result.impl.stream.StreamMergedResult;
import
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
import org.apache.shardingsphere.sharding.merge.dql.orderby.OrderByValue;
@@ -27,6 +31,7 @@ import
org.apache.shardingsphere.sql.parser.sql.common.constant.DirectionType;
import
org.apache.shardingsphere.sql.parser.sql.common.segment.ddl.cursor.DirectionSegment;
import java.sql.SQLException;
+import java.util.Collection;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
@@ -55,9 +60,10 @@ public final class FetchStreamMergedResult extends
StreamMergedResult {
}
private void orderResultSetsToQueue(final List<QueryResult> queryResults,
final SelectStatementContext selectStatementContext, final ShardingSphereSchema
schema) throws SQLException {
-
+ Collection<OrderByItem> items =
selectStatementContext.getOrderByContext().getItems();
for (QueryResult each : queryResults) {
- OrderByValue orderByValue = new OrderByValue(each,
selectStatementContext.getOrderByContext().getItems(), selectStatementContext,
schema);
+ QueryResult queryResult = decorate(each,
selectStatementContext.getDatabaseType());
+ OrderByValue orderByValue = new OrderByValue(queryResult, items,
selectStatementContext, schema);
if (orderByValue.next()) {
orderByValuesQueue.offer(orderByValue);
}
@@ -65,6 +71,13 @@ public final class FetchStreamMergedResult extends
StreamMergedResult {
setCurrentQueryResult(orderByValuesQueue.isEmpty() ?
queryResults.get(0) : orderByValuesQueue.peek().getQueryResult());
}
+ private QueryResult decorate(final QueryResult queryResult, final
DatabaseType databaseType) throws SQLException {
+ if (!DirectionType.isAllDirectionType(directionType) && queryResult
instanceof JDBCStreamQueryResult) {
+ return new JDBCMemoryQueryResult(((JDBCStreamQueryResult)
queryResult).getResultSet(), databaseType);
+ }
+ return queryResult;
+ }
+
@Override
public boolean next() throws SQLException {
if (orderByValuesQueue.isEmpty()) {
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/dql/orderby/OrderByValue.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/dql/orderby/OrderByValue.java
index de71bcd5630..f1c727e273c 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/dql/orderby/OrderByValue.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/dql/orderby/OrderByValue.java
@@ -92,7 +92,7 @@ public final class OrderByValue implements
Comparable<OrderByValue> {
}
/**
- * iterate next data.
+ * Iterate next data.
*
* @return has next data
* @throws SQLException SQL exception
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResultTest.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResultTest.java
new file mode 100644
index 00000000000..db3b20a5b1a
--- /dev/null
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResultTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.sharding.merge.ddl.fetch;
+
+import
org.apache.shardingsphere.infra.binder.statement.ddl.CursorStatementContext;
+import
org.apache.shardingsphere.infra.binder.statement.ddl.FetchStatementContext;
+import org.apache.shardingsphere.infra.database.DefaultDatabase;
+import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
+import org.apache.shardingsphere.infra.merge.result.MergedResult;
+import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.sharding.merge.ddl.ShardingDDLResultMerger;
+import org.apache.shardingsphere.sql.parser.sql.common.constant.DirectionType;
+import
org.apache.shardingsphere.sql.parser.sql.common.segment.ddl.cursor.CursorNameSegment;
+import
org.apache.shardingsphere.sql.parser.sql.common.segment.ddl.cursor.DirectionSegment;
+import
org.apache.shardingsphere.sql.parser.sql.common.segment.dml.item.ProjectionsSegment;
+import
org.apache.shardingsphere.sql.parser.sql.common.segment.generic.table.SimpleTableSegment;
+import
org.apache.shardingsphere.sql.parser.sql.common.segment.generic.table.TableNameSegment;
+import
org.apache.shardingsphere.sql.parser.sql.common.value.identifier.IdentifierValue;
+import
org.apache.shardingsphere.sql.parser.sql.dialect.statement.opengauss.ddl.OpenGaussCursorStatement;
+import
org.apache.shardingsphere.sql.parser.sql.dialect.statement.opengauss.ddl.OpenGaussFetchStatement;
+import
org.apache.shardingsphere.sql.parser.sql.dialect.statement.opengauss.dml.OpenGaussSelectStatement;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public final class FetchStreamMergedResultTest {
+
+ private FetchStatementContext fetchCountStatementContext;
+
+ private FetchStatementContext fetchAllStatementContext;
+
+ private ShardingDDLResultMerger resultMerger;
+
+ private ShardingSphereDatabase database;
+
+ @Before
+ public void setUp() {
+ fetchCountStatementContext = new
FetchStatementContext(createFetchStatement(false));
+
fetchCountStatementContext.setUpCursorDefinition(createCursorStatementContext());
+ fetchAllStatementContext = new
FetchStatementContext(createFetchStatement(true));
+
fetchAllStatementContext.setUpCursorDefinition(createCursorStatementContext());
+ resultMerger = new ShardingDDLResultMerger();
+ database = mock(ShardingSphereDatabase.class, RETURNS_DEEP_STUBS);
+ when(database.getName()).thenReturn(DefaultDatabase.LOGIC_NAME);
+ }
+
+ private OpenGaussFetchStatement createFetchStatement(final boolean
containsAllDirectionType) {
+ OpenGaussFetchStatement result = new OpenGaussFetchStatement();
+ result.setCursorName(new CursorNameSegment(0, 0, new
IdentifierValue("t_order_cursor")));
+ if (containsAllDirectionType) {
+ DirectionSegment direction = new DirectionSegment(0, 0,
DirectionType.ALL);
+ result.setDirection(direction);
+ }
+ return result;
+ }
+
+ private CursorStatementContext createCursorStatementContext() {
+ ShardingSphereDatabase database = mock(ShardingSphereDatabase.class,
RETURNS_DEEP_STUBS);
+ when(database.getName()).thenReturn(DefaultDatabase.LOGIC_NAME);
+ OpenGaussCursorStatement cursorStatement = new
OpenGaussCursorStatement();
+ cursorStatement.setSelect(createSelectStatement());
+ return new
CursorStatementContext(Collections.singletonMap(DefaultDatabase.LOGIC_NAME,
database), Collections.emptyList(), cursorStatement,
DefaultDatabase.LOGIC_NAME);
+ }
+
+ private OpenGaussSelectStatement createSelectStatement() {
+ OpenGaussSelectStatement result = new OpenGaussSelectStatement();
+ result.setProjections(new ProjectionsSegment(0, 0));
+ result.setFrom(new SimpleTableSegment(new TableNameSegment(0, 0, new
IdentifierValue("t_order"))));
+ return result;
+ }
+
+ @Test
+ public void assertNextForResultSetsAllEmpty() throws SQLException {
+ List<QueryResult> queryResults = Arrays.asList(mock(QueryResult.class,
RETURNS_DEEP_STUBS), mock(QueryResult.class, RETURNS_DEEP_STUBS),
mock(QueryResult.class, RETURNS_DEEP_STUBS));
+ MergedResult actual = resultMerger.merge(queryResults,
fetchCountStatementContext, database);
+ assertFalse(actual.next());
+ }
+
+ @Test
+ public void assertNextForResultSetsAllEmptyWhenConfigAllDirectionType()
throws SQLException {
+ List<QueryResult> queryResults = Arrays.asList(mock(QueryResult.class,
RETURNS_DEEP_STUBS), mock(QueryResult.class, RETURNS_DEEP_STUBS),
mock(QueryResult.class, RETURNS_DEEP_STUBS));
+ MergedResult actual = resultMerger.merge(queryResults,
fetchAllStatementContext, database);
+ assertFalse(actual.next());
+ }
+
+ @Test
+ public void assertNextForResultSetsAllNotEmpty() throws SQLException {
+ List<QueryResult> queryResults = Arrays.asList(mock(QueryResult.class,
RETURNS_DEEP_STUBS), mock(QueryResult.class, RETURNS_DEEP_STUBS),
mock(QueryResult.class, RETURNS_DEEP_STUBS));
+ for (QueryResult each : queryResults) {
+ when(each.next()).thenReturn(true, false);
+ }
+ MergedResult actual = resultMerger.merge(queryResults,
fetchCountStatementContext, database);
+ assertTrue(actual.next());
+ assertFalse(actual.next());
+ assertFalse(actual.next());
+ assertFalse(actual.next());
+ }
+
+ @Test
+ public void assertNextForResultSetsAllNotEmptyWhenConfigAllDirectionType()
throws SQLException {
+ List<QueryResult> queryResults = Arrays.asList(mock(QueryResult.class,
RETURNS_DEEP_STUBS), mock(QueryResult.class, RETURNS_DEEP_STUBS),
mock(QueryResult.class, RETURNS_DEEP_STUBS));
+ for (QueryResult each : queryResults) {
+ when(each.next()).thenReturn(true, false);
+ }
+ MergedResult actual = resultMerger.merge(queryResults,
fetchAllStatementContext, database);
+ assertTrue(actual.next());
+ assertTrue(actual.next());
+ assertTrue(actual.next());
+ assertFalse(actual.next());
+ }
+
+ @Test
+ public void assertNextForFirstResultSetsNotEmptyOnly() throws SQLException
{
+ List<QueryResult> queryResults = Arrays.asList(mock(QueryResult.class,
RETURNS_DEEP_STUBS), mock(QueryResult.class, RETURNS_DEEP_STUBS),
mock(QueryResult.class, RETURNS_DEEP_STUBS));
+ when(queryResults.get(0).next()).thenReturn(true, false);
+ MergedResult actual = resultMerger.merge(queryResults,
fetchCountStatementContext, database);
+ assertTrue(actual.next());
+ assertFalse(actual.next());
+ }
+
+ @Test
+ public void
assertNextForFirstResultSetsNotEmptyOnlyWhenConfigAllDirectionType() throws
SQLException {
+ List<QueryResult> queryResults = Arrays.asList(mock(QueryResult.class,
RETURNS_DEEP_STUBS), mock(QueryResult.class, RETURNS_DEEP_STUBS),
mock(QueryResult.class, RETURNS_DEEP_STUBS));
+ when(queryResults.get(0).next()).thenReturn(true, false);
+ MergedResult actual = resultMerger.merge(queryResults,
fetchAllStatementContext, database);
+ assertTrue(actual.next());
+ assertFalse(actual.next());
+ }
+
+ @Test
+ public void assertNextForMiddleResultSetsNotEmpty() throws SQLException {
+ List<QueryResult> queryResults = Arrays.asList(mock(QueryResult.class,
RETURNS_DEEP_STUBS), mock(QueryResult.class, RETURNS_DEEP_STUBS),
mock(QueryResult.class, RETURNS_DEEP_STUBS));
+ when(queryResults.get(1).next()).thenReturn(true, false);
+ MergedResult actual = resultMerger.merge(queryResults,
fetchCountStatementContext, database);
+ assertTrue(actual.next());
+ assertFalse(actual.next());
+ }
+
+ @Test
+ public void
assertNextForMiddleResultSetsNotEmptyWhenConfigAllDirectionType() throws
SQLException {
+ List<QueryResult> queryResults = Arrays.asList(mock(QueryResult.class,
RETURNS_DEEP_STUBS), mock(QueryResult.class, RETURNS_DEEP_STUBS),
mock(QueryResult.class, RETURNS_DEEP_STUBS));
+ when(queryResults.get(1).next()).thenReturn(true, false);
+ MergedResult actual = resultMerger.merge(queryResults,
fetchAllStatementContext, database);
+ assertTrue(actual.next());
+ assertFalse(actual.next());
+ }
+
+ @Test
+ public void assertNextForLastResultSetsNotEmptyOnly() throws SQLException {
+ List<QueryResult> queryResults = Arrays.asList(mock(QueryResult.class,
RETURNS_DEEP_STUBS), mock(QueryResult.class, RETURNS_DEEP_STUBS),
mock(QueryResult.class, RETURNS_DEEP_STUBS));
+ when(queryResults.get(2).next()).thenReturn(true, false);
+ MergedResult actual = resultMerger.merge(queryResults,
fetchCountStatementContext, database);
+ assertTrue(actual.next());
+ assertFalse(actual.next());
+ }
+
+ @Test
+ public void
assertNextForLastResultSetsNotEmptyOnlyWhenConfigAllDirectionType() throws
SQLException {
+ List<QueryResult> queryResults = Arrays.asList(mock(QueryResult.class,
RETURNS_DEEP_STUBS), mock(QueryResult.class, RETURNS_DEEP_STUBS),
mock(QueryResult.class, RETURNS_DEEP_STUBS));
+ when(queryResults.get(2).next()).thenReturn(true, false);
+ MergedResult actual = resultMerger.merge(queryResults,
fetchAllStatementContext, database);
+ assertTrue(actual.next());
+ assertFalse(actual.next());
+ }
+}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java
index 9ae610bf44a..e57ee177f3e 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java
@@ -22,6 +22,7 @@ import io.vertx.core.Future;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.binder.aware.CursorDefinitionAware;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import
org.apache.shardingsphere.infra.binder.statement.ddl.CloseStatementContext;
import
org.apache.shardingsphere.infra.binder.statement.ddl.CursorStatementContext;
import org.apache.shardingsphere.infra.binder.type.CursorAvailable;
import
org.apache.shardingsphere.infra.distsql.exception.resource.RequiredResourceMissedException;
@@ -35,6 +36,7 @@ import
org.apache.shardingsphere.proxy.backend.exception.RuleNotExistedException
import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import
org.apache.shardingsphere.proxy.backend.text.data.DatabaseBackendHandler;
+import
org.apache.shardingsphere.sharding.merge.ddl.fetch.FetchOrderByValueQueuesHolder;
import java.sql.SQLException;
import java.util.Collection;
@@ -93,6 +95,9 @@ public final class SchemaAssignedDatabaseBackendHandler
implements DatabaseBacke
if (statementContext instanceof CursorStatementContext) {
connectionSession.getCursorDefinitions().put(cursorName,
(CursorStatementContext) statementContext);
}
+ if (statementContext instanceof CloseStatementContext) {
+ FetchOrderByValueQueuesHolder.get().remove(cursorName);
+ }
if (statementContext instanceof CursorDefinitionAware) {
CursorStatementContext cursorStatementContext =
connectionSession.getCursorDefinitions().get(cursorName);
Preconditions.checkArgument(null != cursorStatementContext,
"Cursor %s does not exist.", cursorName);