This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c04b722 Migrate type of execute results to List in proxy backend
(#15045)
c04b722 is described below
commit c04b722b3873922ae6f917d3d9bd86f397c0ce04
Author: 吴伟杰 <[email protected]>
AuthorDate: Wed Jan 26 18:57:12 2022 +0800
Migrate type of execute results to List in proxy backend (#15045)
* Migrate type of execute results to List
* Remove redundant type cast in stream
---
.../executor/sql/execute/engine/raw/RawExecutor.java | 11 ++++-------
.../proxy/backend/communication/ProxySQLExecutor.java | 17 +++++++++--------
.../jdbc/JDBCDatabaseCommunicationEngine.java | 15 +++++++--------
.../communication/jdbc/executor/ProxyJDBCExecutor.java | 8 ++++----
.../vertx/VertxDatabaseCommunicationEngine.java | 8 ++++----
5 files changed, 28 insertions(+), 31 deletions(-)
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
index a4d4e13..cd46fe2 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
+++
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
@@ -27,13 +27,12 @@ import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorEx
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
+import
org.apache.shardingsphere.infra.executor.sql.process.ExecuteProcessEngine;
import java.sql.SQLException;
-import java.util.Collection;
import java.util.Collections;
-import java.util.Objects;
import java.util.List;
-import
org.apache.shardingsphere.infra.executor.sql.process.ExecuteProcessEngine;
+import java.util.Objects;
/**
* Raw executor.
@@ -56,16 +55,14 @@ public final class RawExecutor {
* @return execute results
* @throws SQLException SQL exception
*/
- public Collection<ExecuteResult> execute(final
ExecutionGroupContext<RawSQLExecutionUnit> executionGroupContext,
- final LogicSQL logicSQL,
- final RawSQLExecutorCallback
callback) throws SQLException {
+ public List<ExecuteResult> execute(final
ExecutionGroupContext<RawSQLExecutionUnit> executionGroupContext, final
LogicSQL logicSQL, final RawSQLExecutorCallback callback) throws SQLException {
try {
ExecuteProcessEngine.initialize(logicSQL, executionGroupContext,
props);
// TODO Load query header for first query
List<ExecuteResult> results = execute(executionGroupContext,
(RawSQLExecutorCallback) null, callback);
ExecuteProcessEngine.finish(executionGroupContext.getExecutionID());
return CollectionUtils.isEmpty(results) ||
Objects.isNull(results.get(0)) ? Collections
- .singleton(new UpdateResult(0, 0L)) : results;
+ .singletonList(new UpdateResult(0, 0L)) : results;
} finally {
ExecuteProcessEngine.clean();
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
index cd31231..24c93be 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
@@ -54,6 +54,7 @@ import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.Optional;
/**
@@ -109,7 +110,7 @@ public final class ProxySQLExecutor {
* @return execute results
* @throws SQLException SQL exception
*/
- public Collection<ExecuteResult> execute(final ExecutionContext
executionContext) throws SQLException {
+ public List<ExecuteResult> execute(final ExecutionContext
executionContext) throws SQLException {
String schemaName =
backendConnection.getConnectionSession().getSchemaName();
Collection<ShardingSphereRule> rules =
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData(schemaName).getRuleMetaData().getRules();
int maxConnectionsSizePerQuery =
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
@@ -117,15 +118,15 @@ public final class ProxySQLExecutor {
return execute(executionContext, rules, maxConnectionsSizePerQuery,
isReturnGeneratedKeys);
}
- private Collection<ExecuteResult> execute(final ExecutionContext
executionContext, final Collection<ShardingSphereRule> rules,
- final int
maxConnectionsSizePerQuery, final boolean isReturnGeneratedKeys) throws
SQLException {
+ private List<ExecuteResult> execute(final ExecutionContext
executionContext, final Collection<ShardingSphereRule> rules,
+ final int maxConnectionsSizePerQuery,
final boolean isReturnGeneratedKeys) throws SQLException {
if (rules.stream().anyMatch(each -> each instanceof RawExecutionRule))
{
return rawExecute(executionContext, rules,
maxConnectionsSizePerQuery);
}
return useDriverToExecute(executionContext, rules,
maxConnectionsSizePerQuery, isReturnGeneratedKeys,
SQLExecutorExceptionHandler.isExceptionThrown());
}
- private Collection<ExecuteResult> rawExecute(final ExecutionContext
executionContext, final Collection<ShardingSphereRule> rules, final int
maxConnectionsSizePerQuery) throws SQLException {
+ private List<ExecuteResult> rawExecute(final ExecutionContext
executionContext, final Collection<ShardingSphereRule> rules, final int
maxConnectionsSizePerQuery) throws SQLException {
RawExecutionPrepareEngine prepareEngine = new
RawExecutionPrepareEngine(maxConnectionsSizePerQuery, rules);
ExecutionGroupContext<RawSQLExecutionUnit> executionGroupContext;
try {
@@ -139,8 +140,8 @@ public final class ProxySQLExecutor {
return rawExecutor.execute(executionGroupContext,
executionContext.getLogicSQL(), new RawSQLExecutorCallback());
}
- private Collection<ExecuteResult> useDriverToExecute(final
ExecutionContext executionContext, final Collection<ShardingSphereRule> rules,
- final int
maxConnectionsSizePerQuery, final boolean isReturnGeneratedKeys, final boolean
isExceptionThrown) throws SQLException {
+ private List<ExecuteResult> useDriverToExecute(final ExecutionContext
executionContext, final Collection<ShardingSphereRule> rules,
+ final int
maxConnectionsSizePerQuery, final boolean isReturnGeneratedKeys, final boolean
isExceptionThrown) throws SQLException {
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = new DriverExecutionPrepareEngine<>(
type, maxConnectionsSizePerQuery, backendConnection, new
StatementOption(isReturnGeneratedKeys), rules);
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext;
@@ -154,11 +155,11 @@ public final class ProxySQLExecutor {
return jdbcExecutor.execute(executionContext.getLogicSQL(),
executionGroupContext, isReturnGeneratedKeys, isExceptionThrown);
}
- private Collection<ExecuteResult> getSaneExecuteResults(final
ExecutionContext executionContext, final SQLException originalException) throws
SQLException {
+ private List<ExecuteResult> getSaneExecuteResults(final ExecutionContext
executionContext, final SQLException originalException) throws SQLException {
DatabaseType databaseType =
ProxyContext.getInstance().getMetaData(backendConnection.getConnectionSession().getSchemaName()).getResource().getDatabaseType();
Optional<ExecuteResult> executeResult =
JDBCSaneQueryResultEngineFactory.newInstance(databaseType).getSaneQueryResult(executionContext.getSqlStatementContext().getSqlStatement());
if (executeResult.isPresent()) {
- return Collections.singleton(executeResult.get());
+ return Collections.singletonList(executeResult.get());
}
throw originalException;
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
index b876663..97c4e1f 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
@@ -25,15 +25,13 @@ import
org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
-import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.metadata.JDBCQueryResultMetaData;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
-import
org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
-import org.apache.shardingsphere.infra.federation.executor.FederationExecutor;
import org.apache.shardingsphere.infra.federation.executor.FederationContext;
+import org.apache.shardingsphere.infra.federation.executor.FederationExecutor;
import
org.apache.shardingsphere.infra.federation.executor.FederationExecutorFactory;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import
org.apache.shardingsphere.infra.rule.identifier.type.DataNodeContainedRule;
@@ -60,8 +58,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
+import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.stream.Collectors;
/**
* JDBC database communication engine.
@@ -111,6 +109,7 @@ public final class JDBCDatabaseCommunicationEngine extends
DatabaseCommunication
*
* @return backend response
*/
+ @SuppressWarnings({"unchecked", "rawtypes"})
@SneakyThrows(SQLException.class)
public ResponseHeader execute() {
ExecutionContext executionContext = getKernelProcessor()
@@ -125,12 +124,12 @@ public final class JDBCDatabaseCommunicationEngine
extends DatabaseCommunication
return new
UpdateResponseHeader(executionContext.getSqlStatementContext().getSqlStatement());
}
proxySQLExecutor.checkExecutePrerequisites(executionContext);
- Collection<ExecuteResult> result =
proxySQLExecutor.execute(executionContext);
+ List result = proxySQLExecutor.execute(executionContext);
refreshMetaData(executionContext);
- ExecuteResult executeResultSample = result.iterator().next();
+ Object executeResultSample = result.iterator().next();
return executeResultSample instanceof QueryResult
- ? processExecuteQuery(executionContext,
result.stream().map(each -> (QueryResult) each).collect(Collectors.toList()),
(QueryResult) executeResultSample)
- : processExecuteUpdate(executionContext,
result.stream().map(each -> (UpdateResult) each).collect(Collectors.toList()));
+ ? processExecuteQuery(executionContext, result, (QueryResult)
executeResultSample)
+ : processExecuteUpdate(executionContext, result);
}
private ResultSet doExecuteFederation(final LogicSQL logicSQL, final
MetaDataContexts metaDataContexts) throws SQLException {
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
index e3dd5cc..f216dd3 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
@@ -34,7 +34,7 @@ import
org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import java.sql.SQLException;
-import java.util.Collection;
+import java.util.List;
/**
* Proxy JDBC executor.
@@ -61,14 +61,14 @@ public final class ProxyJDBCExecutor {
* @return execute results
* @throws SQLException SQL exception
*/
- public Collection<ExecuteResult> execute(final LogicSQL logicSQL, final
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext,
- final boolean
isReturnGeneratedKeys, final boolean isExceptionThrown) throws SQLException {
+ public List<ExecuteResult> execute(final LogicSQL logicSQL, final
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext,
+ final boolean isReturnGeneratedKeys,
final boolean isExceptionThrown) throws SQLException {
try {
MetaDataContexts metaDataContexts =
ProxyContext.getInstance().getContextManager().getMetaDataContexts();
DatabaseType databaseType =
metaDataContexts.getMetaData(connectionSession.getSchemaName()).getResource().getDatabaseType();
ExecuteProcessEngine.initialize(logicSQL, executionGroupContext,
metaDataContexts.getProps());
SQLStatementContext<?> context = logicSQL.getSqlStatementContext();
- Collection<ExecuteResult> result =
jdbcExecutor.execute(executionGroupContext,
+ List<ExecuteResult> result =
jdbcExecutor.execute(executionGroupContext,
ProxyJDBCExecutorCallbackFactory.newInstance(type,
databaseType, context.getSqlStatement(), databaseCommunicationEngine,
isReturnGeneratedKeys, isExceptionThrown, true),
ProxyJDBCExecutorCallbackFactory.newInstance(type,
databaseType, context.getSqlStatement(), databaseCommunicationEngine,
isReturnGeneratedKeys, isExceptionThrown, false));
ExecuteProcessEngine.finish(executionGroupContext.getExecutionID());
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/vertx/VertxDatabaseCommunicationEngine.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/vertx/VertxDatabaseCommunicationEngine.java
index fbcd3e9..c8da323 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/vertx/VertxDatabaseCommunicationEngine.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/vertx/VertxDatabaseCommunicationEngine.java
@@ -22,7 +22,6 @@ import org.apache.shardingsphere.infra.binder.LogicSQL;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
-import
org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
import
org.apache.shardingsphere.proxy.backend.communication.ReactiveProxySQLExecutor;
@@ -31,7 +30,7 @@ import
org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
import
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
import java.sql.SQLException;
-import java.util.stream.Collectors;
+import java.util.List;
/**
* Vert.x database communication engine.
@@ -50,6 +49,7 @@ public final class VertxDatabaseCommunicationEngine extends
DatabaseCommunicatio
*
* @return Future of response
*/
+ @SuppressWarnings({"unchecked", "rawtypes"})
@Override
public Future<ResponseHeader> execute() {
try {
@@ -67,8 +67,8 @@ public final class VertxDatabaseCommunicationEngine extends
DatabaseCommunicatio
refreshMetaData(executionContext);
ExecuteResult executeResultSample =
result.iterator().next();
return Future.succeededFuture(executeResultSample
instanceof QueryResult
- ? processExecuteQuery(executionContext,
result.stream().map(each -> (QueryResult) each).collect(Collectors.toList()),
(QueryResult) executeResultSample)
- : processExecuteUpdate(executionContext,
result.stream().map(each -> (UpdateResult) each).collect(Collectors.toList())));
+ ? processExecuteQuery(executionContext, (List)
result, (QueryResult) executeResultSample)
+ : processExecuteUpdate(executionContext, (List)
result));
} catch (final SQLException ex) {
return Future.failedFuture(ex);
}