This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a63800ddb45 Optimize fetech statement merge logic (#18200)
a63800ddb45 is described below
commit a63800ddb4513f7f9b79c00830507b58fe2101bc
Author: Zhengqiang Duan <[email protected]>
AuthorDate: Mon Jun 6 20:39:35 2022 +0800
Optimize fetech statement merge logic (#18200)
---
.../merge/ddl/ShardingDDLResultMerger.java | 6 +--
.../merge/ddl/fetch/FetchStreamMergedResult.java | 29 +++++++++-----
.../parser/sql/common/constant/DirectionType.java | 12 ++++--
.../sql/common/constant/DirectionTypeTest.java | 44 ++++++++++++++++++++++
4 files changed, 75 insertions(+), 16 deletions(-)
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/ShardingDDLResultMerger.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/ShardingDDLResultMerger.java
index a929d8f6087..0ec6784dcf1 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/ShardingDDLResultMerger.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/ShardingDDLResultMerger.java
@@ -50,10 +50,10 @@ public final class ShardingDDLResultMerger implements
ResultMerger {
if (1 == queryResults.size()) {
return new IteratorStreamMergedResult(queryResults);
}
+ FetchStatementContext fetchStatementContext = (FetchStatementContext)
sqlStatementContext;
Map<String, Integer> columnLabelIndexMap =
getColumnLabelIndexMap(queryResults.get(0));
- FetchStatementContext statementContext = (FetchStatementContext)
sqlStatementContext;
-
statementContext.getCursorStatementContext().getSelectStatementContext().setIndexes(columnLabelIndexMap);
- return new FetchStreamMergedResult(queryResults, statementContext,
getSchema(sqlStatementContext, database));
+
fetchStatementContext.getCursorStatementContext().getSelectStatementContext().setIndexes(columnLabelIndexMap);
+ return new FetchStreamMergedResult(queryResults,
fetchStatementContext, getSchema(sqlStatementContext, database));
}
private ShardingSphereSchema getSchema(final SQLStatementContext<?>
sqlStatementContext, final ShardingSphereDatabase database) {
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
index 7c6b7e99298..a5fb82256a1 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
@@ -17,16 +17,16 @@
package org.apache.shardingsphere.sharding.merge.ddl.fetch;
-import
org.apache.shardingsphere.infra.binder.segment.select.orderby.OrderByItem;
import
org.apache.shardingsphere.infra.binder.statement.ddl.FetchStatementContext;
import
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import
org.apache.shardingsphere.infra.merge.result.impl.stream.StreamMergedResult;
import
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
import org.apache.shardingsphere.sharding.merge.dql.orderby.OrderByValue;
+import org.apache.shardingsphere.sql.parser.sql.common.constant.DirectionType;
+import
org.apache.shardingsphere.sql.parser.sql.common.segment.ddl.cursor.DirectionSegment;
import java.sql.SQLException;
-import java.util.Collection;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
@@ -36,28 +36,33 @@ import java.util.Queue;
*/
public final class FetchStreamMergedResult extends StreamMergedResult {
- private final Collection<OrderByItem> orderByItems;
-
private final Queue<OrderByValue> orderByValuesQueue;
+ private final DirectionType directionType;
+
+ private long fetchCount;
+
private boolean isFirstNext;
public FetchStreamMergedResult(final List<QueryResult> queryResults, final
FetchStatementContext fetchStatementContext, final ShardingSphereSchema schema)
throws SQLException {
String cursorName =
fetchStatementContext.getCursorName().getIdentifier().getValue().toLowerCase();
SelectStatementContext selectStatementContext =
fetchStatementContext.getCursorStatementContext().getSelectStatementContext();
- orderByItems = selectStatementContext.getOrderByContext().getItems();
orderByValuesQueue =
FetchOrderByValueQueuesHolder.get().computeIfAbsent(cursorName, key -> new
PriorityQueue<>(queryResults.size()));
orderResultSetsToQueue(queryResults, selectStatementContext, schema);
+ directionType =
fetchStatementContext.getSqlStatement().getDirection().map(DirectionSegment::getDirectionType).orElse(DirectionType.NEXT);
+ fetchCount =
fetchStatementContext.getSqlStatement().getDirection().flatMap(DirectionSegment::getCount).orElse(1L);
isFirstNext = true;
}
private void orderResultSetsToQueue(final List<QueryResult> queryResults,
final SelectStatementContext selectStatementContext, final ShardingSphereSchema
schema) throws SQLException {
+
for (QueryResult each : queryResults) {
- OrderByValue orderByValue = new OrderByValue(each, orderByItems,
selectStatementContext, schema);
+ OrderByValue orderByValue = new OrderByValue(each,
selectStatementContext.getOrderByContext().getItems(), selectStatementContext,
schema);
if (orderByValue.next()) {
orderByValuesQueue.offer(orderByValue);
}
}
+ setCurrentQueryResult(orderByValuesQueue.isEmpty() ?
queryResults.get(0) : orderByValuesQueue.peek().getQueryResult());
}
@Override
@@ -65,13 +70,19 @@ public final class FetchStreamMergedResult extends
StreamMergedResult {
if (orderByValuesQueue.isEmpty()) {
return false;
}
- // TODO support fetch count and fetch all statement
if (isFirstNext) {
- setCurrentQueryResult(orderByValuesQueue.poll().getQueryResult());
isFirstNext = false;
+ fetchCount--;
return true;
- } else {
+ }
+ OrderByValue firstOrderByValue = orderByValuesQueue.poll();
+ if (firstOrderByValue.next()) {
+ orderByValuesQueue.offer(firstOrderByValue);
+ }
+ if (orderByValuesQueue.isEmpty()) {
return false;
}
+ setCurrentQueryResult(orderByValuesQueue.peek().getQueryResult());
+ return DirectionType.isAllDirectionType(directionType) || fetchCount--
> 0;
}
}
diff --git
a/shardingsphere-sql-parser/shardingsphere-sql-parser-statement/src/main/java/org/apache/shardingsphere/sql/parser/sql/common/constant/DirectionType.java
b/shardingsphere-sql-parser/shardingsphere-sql-parser-statement/src/main/java/org/apache/shardingsphere/sql/parser/sql/common/constant/DirectionType.java
index 20b1374b5ff..c97d418149a 100644
---
a/shardingsphere-sql-parser/shardingsphere-sql-parser-statement/src/main/java/org/apache/shardingsphere/sql/parser/sql/common/constant/DirectionType.java
+++
b/shardingsphere-sql-parser/shardingsphere-sql-parser-statement/src/main/java/org/apache/shardingsphere/sql/parser/sql/common/constant/DirectionType.java
@@ -18,6 +18,8 @@
package org.apache.shardingsphere.sql.parser.sql.common.constant;
import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
/**
* Direction type enum.
@@ -26,13 +28,15 @@ public enum DirectionType {
NEXT, PRIOR, FIRST, LAST, ABSOLUTE_COUNT, RELATIVE_COUNT, COUNT, ALL,
FORWARD, FORWARD_COUNT, FORWARD_ALL, BACKWARD, BACKWARD_COUNT, BACKWARD_ALL;
+ private static final Set<DirectionType> ALL_DIRECTION_TYPES = new
HashSet<>(Arrays.asList(ALL, FORWARD_ALL, BACKWARD_ALL));
+
/**
- * Is direction type.
+ * Is all direction type.
*
* @param directionType direction type
- * @return is direction type or not
+ * @return is all direction type or not
*/
- public static boolean isAggregationType(final String directionType) {
- return Arrays.stream(values()).anyMatch(each ->
directionType.equalsIgnoreCase(each.name()));
+ public static boolean isAllDirectionType(final DirectionType
directionType) {
+ return ALL_DIRECTION_TYPES.contains(directionType);
}
}
diff --git
a/shardingsphere-sql-parser/shardingsphere-sql-parser-statement/src/test/java/org/apache/shardingsphere/sql/parser/sql/common/constant/DirectionTypeTest.java
b/shardingsphere-sql-parser/shardingsphere-sql-parser-statement/src/test/java/org/apache/shardingsphere/sql/parser/sql/common/constant/DirectionTypeTest.java
new file mode 100644
index 00000000000..8799347256c
--- /dev/null
+++
b/shardingsphere-sql-parser/shardingsphere-sql-parser-statement/src/test/java/org/apache/shardingsphere/sql/parser/sql/common/constant/DirectionTypeTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.sql.parser.sql.common.constant;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public final class DirectionTypeTest {
+
+ @Test
+ public void assertIsAggregationType() {
+ assertFalse(DirectionType.isAllDirectionType(DirectionType.NEXT));
+ assertFalse(DirectionType.isAllDirectionType(DirectionType.PRIOR));
+ assertFalse(DirectionType.isAllDirectionType(DirectionType.FIRST));
+ assertFalse(DirectionType.isAllDirectionType(DirectionType.LAST));
+
assertFalse(DirectionType.isAllDirectionType(DirectionType.ABSOLUTE_COUNT));
+
assertFalse(DirectionType.isAllDirectionType(DirectionType.RELATIVE_COUNT));
+ assertFalse(DirectionType.isAllDirectionType(DirectionType.COUNT));
+ assertTrue(DirectionType.isAllDirectionType(DirectionType.ALL));
+ assertFalse(DirectionType.isAllDirectionType(DirectionType.FORWARD));
+
assertFalse(DirectionType.isAllDirectionType(DirectionType.FORWARD_COUNT));
+
assertTrue(DirectionType.isAllDirectionType(DirectionType.FORWARD_ALL));
+ assertFalse(DirectionType.isAllDirectionType(DirectionType.BACKWARD));
+
assertFalse(DirectionType.isAllDirectionType(DirectionType.BACKWARD_COUNT));
+
assertTrue(DirectionType.isAllDirectionType(DirectionType.BACKWARD_ALL));
+ }
+}