This is an automated email from the ASF dual-hosted git repository.
xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new ff61094 Refactor ProxyRawExecutor (#8410)
ff61094 is described below
commit ff6109453c06fe8d9e01a5d150243936931bf540
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Nov 29 22:01:50 2020 +0800
Refactor ProxyRawExecutor (#8410)
* Add processExecuteQuery and processExecuteUpdate for ProxySQLExecutor
* Refactor RawSQLExecutorCallback
* Refactor ProxySQLExecutor
* Move ProxyRawExecutor to raw package
---
.../raw/callback/RawSQLExecutorCallback.java | 14 ++--
.../backend/communication/ProxySQLExecutor.java | 95 ++++++++++++----------
.../ProxyRawExecutor.java} | 14 ++--
3 files changed, 64 insertions(+), 59 deletions(-)
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
index 6979441..b916303 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
+++
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
@@ -17,8 +17,8 @@
package
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback;
+import com.google.common.base.Preconditions;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.infra.exception.ShardingSphereException;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorCallback;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
@@ -38,17 +38,17 @@ public final class RawSQLExecutorCallback implements
ExecutorCallback<RawSQLExec
ShardingSphereServiceLoader.register(RawExecutorCallback.class);
}
- private final Collection<RawExecutorCallback> rawExecutorCallbacks;
+ @SuppressWarnings("rawtypes")
+ private final Collection<RawExecutorCallback> callbacks;
public RawSQLExecutorCallback() {
- rawExecutorCallbacks =
ShardingSphereServiceLoader.newServiceInstances(RawExecutorCallback.class);
- if (null == rawExecutorCallbacks || rawExecutorCallbacks.isEmpty()) {
- throw new ShardingSphereException("not found raw executor callback
impl");
- }
+ callbacks =
ShardingSphereServiceLoader.newServiceInstances(RawExecutorCallback.class);
+ Preconditions.checkState(!callbacks.isEmpty(), "No raw executor
callback implementation found.");
}
+ @SuppressWarnings("unchecked")
@Override
public Collection<ExecuteResult> execute(final
Collection<RawSQLExecutionUnit> inputs, final boolean isTrunkThread, final
Map<String, Object> dataMap) throws SQLException {
- return rawExecutorCallbacks.iterator().next().execute(inputs,
isTrunkThread, dataMap);
+ return callbacks.iterator().next().execute(inputs, isTrunkThread,
dataMap);
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
index 58f2721..90e0dfc 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
@@ -42,7 +42,7 @@ import
org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import org.apache.shardingsphere.infra.rule.type.RawExecutionRule;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.ProxyJDBCExecutorCallback;
-import
org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.RawProxyExecutor;
+import
org.apache.shardingsphere.proxy.backend.communication.raw.ProxyRawExecutor;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.statement.accessor.JDBCAccessor;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.transaction.TransactionStatus;
import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
@@ -77,7 +77,7 @@ public final class ProxySQLExecutor {
private final JDBCExecutor jdbcExecutor;
- private final RawProxyExecutor rawExecutor;
+ private final ProxyRawExecutor rawExecutor;
public ProxySQLExecutor(final BackendConnection backendConnection, final
JDBCAccessor accessor) {
this.backendConnection = backendConnection;
@@ -85,7 +85,7 @@ public final class ProxySQLExecutor {
ExecutorEngine executorEngine =
BackendExecutorContext.getInstance().getExecutorEngine();
boolean isSerialExecute = backendConnection.isSerialExecute();
jdbcExecutor = new JDBCExecutor(executorEngine, isSerialExecute);
- rawExecutor = new RawProxyExecutor(executorEngine, isSerialExecute);
+ rawExecutor = new ProxyRawExecutor(executorEngine, isSerialExecute);
}
/**
@@ -121,66 +121,59 @@ public final class ProxySQLExecutor {
public BackendResponse execute(final ExecutionContext executionContext)
throws SQLException {
Collection<ExecuteResult> executeResults = execute(executionContext,
executionContext.getSqlStatementContext().getSqlStatement()
instanceof InsertStatement, SQLExecutorExceptionHandler.isExceptionThrown());
- ExecuteResult executeResult = executeResults.iterator().next();
- if (executeResult instanceof QueryResult) {
- ShardingSphereMetaData metaData =
ProxyContext.getInstance().getMetaData(backendConnection.getSchemaName());
- int columnCount = ((QueryResult)
executeResult).getMetaData().getColumnCount();
- List<QueryHeader> queryHeaders = new ArrayList<>(columnCount);
- for (int columnIndex = 1; columnIndex <= columnCount;
columnIndex++) {
- if
(hasSelectExpandProjections(executionContext.getSqlStatementContext())) {
- queryHeaders.add(QueryHeaderBuilder.build(
- ((SelectStatementContext)
executionContext.getSqlStatementContext()).getProjectionsContext(),
(QueryResult) executeResult, metaData, columnIndex));
- } else {
- queryHeaders.add(QueryHeaderBuilder.build((QueryResult)
executeResult, metaData, columnIndex));
- }
- }
- return getExecuteQueryResponse(queryHeaders, executeResults);
- } else {
- UpdateResponse result = new UpdateResponse(executeResults);
- if (executionContext.getSqlStatementContext().getSqlStatement()
instanceof InsertStatement) {
- result.setType("INSERT");
- } else if
(executionContext.getSqlStatementContext().getSqlStatement() instanceof
DeleteStatement) {
- result.setType("DELETE");
- } else if
(executionContext.getSqlStatementContext().getSqlStatement() instanceof
UpdateStatement) {
- result.setType("UPDATE");
- }
- return result;
- }
+ ExecuteResult executeResultSample = executeResults.iterator().next();
+ return executeResultSample instanceof QueryResult
+ ? processExecuteQuery(executionContext, executeResults,
(QueryResult) executeResultSample) : processExecuteUpdate(executionContext,
executeResults);
}
private Collection<ExecuteResult> execute(final ExecutionContext
executionContext, final boolean isReturnGeneratedKeys, final boolean
isExceptionThrown) throws SQLException {
int maxConnectionsSizePerQuery =
ProxyContext.getInstance().getMetaDataContexts().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
- ShardingSphereMetaData metaData =
ProxyContext.getInstance().getMetaDataContexts().getMetaDataMap().get(backendConnection.getSchemaName());
- return metaData.getRuleMetaData().getRules().stream().anyMatch(each ->
each instanceof RawExecutionRule)
- ? executeWithRaw(executionContext, maxConnectionsSizePerQuery)
- : executeWithDriver(executionContext,
maxConnectionsSizePerQuery, isReturnGeneratedKeys, isExceptionThrown);
+ Collection<ShardingSphereRule> rules =
ProxyContext.getInstance().getMetaDataContexts().getMetaDataMap().get(backendConnection.getSchemaName()).getRuleMetaData().getRules();
+ return rules.stream().anyMatch(each -> each instanceof
RawExecutionRule)
+ ? rawExecute(executionContext, rules,
maxConnectionsSizePerQuery) : useDriverToExecute(executionContext, rules,
maxConnectionsSizePerQuery, isReturnGeneratedKeys, isExceptionThrown);
+ }
+
+ private Collection<ExecuteResult> rawExecute(final ExecutionContext
executionContext, final Collection<ShardingSphereRule> rules, final int
maxConnectionsSizePerQuery) throws SQLException {
+ RawExecutionPrepareEngine prepareEngine = new
RawExecutionPrepareEngine(maxConnectionsSizePerQuery, rules);
+ Collection<ExecutionGroup<RawSQLExecutionUnit>> executionGroups =
prepareEngine.prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits());
+ // TODO handle query header
+ return rawExecutor.execute(executionGroups, new
RawSQLExecutorCallback());
}
- private Collection<ExecuteResult> executeWithDriver(final ExecutionContext
executionContext,
- final int
maxConnectionsSizePerQuery, final boolean isReturnGeneratedKeys, final boolean
isExceptionThrown) throws SQLException {
+ private Collection<ExecuteResult> useDriverToExecute(final
ExecutionContext executionContext, final Collection<ShardingSphereRule> rules,
+ final int
maxConnectionsSizePerQuery, final boolean isReturnGeneratedKeys, final boolean
isExceptionThrown) throws SQLException {
DatabaseType databaseType =
ProxyContext.getInstance().getMetaDataContexts().getDatabaseType();
- return
jdbcExecutor.execute(createExecutionGroups(executionContext.getExecutionUnits(),
maxConnectionsSizePerQuery, isReturnGeneratedKeys,
executionContext.getRouteContext()),
+ Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups =
createExecutionGroups(
+ executionContext.getExecutionUnits(), rules,
maxConnectionsSizePerQuery, isReturnGeneratedKeys,
executionContext.getRouteContext());
+ return jdbcExecutor.execute(executionGroups,
new ProxyJDBCExecutorCallback(databaseType, backendConnection,
accessor, isExceptionThrown, isReturnGeneratedKeys, true),
new ProxyJDBCExecutorCallback(databaseType, backendConnection,
accessor, isExceptionThrown, isReturnGeneratedKeys, false));
}
- private Collection<ExecutionGroup<JDBCExecutionUnit>>
createExecutionGroups(final Collection<ExecutionUnit> executionUnits, final int
maxConnectionsSizePerQuery,
-
final boolean isReturnGeneratedKeys, final RouteContext routeContext) throws
SQLException {
- Collection<ShardingSphereRule> rules =
ProxyContext.getInstance().getMetaData(backendConnection.getSchemaName()).getRuleMetaData().getRules();
+ private Collection<ExecutionGroup<JDBCExecutionUnit>>
createExecutionGroups(final Collection<ExecutionUnit> executionUnits, final
Collection<ShardingSphereRule> rules,
+
final int maxConnectionsSizePerQuery, final boolean isReturnGeneratedKeys,
+
final RouteContext routeContext) throws SQLException {
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = accessor.getExecutionPrepareEngine(
backendConnection, maxConnectionsSizePerQuery, new
StatementOption(isReturnGeneratedKeys), rules);
return prepareEngine.prepare(routeContext, executionUnits);
}
- private Collection<ExecuteResult> executeWithRaw(final ExecutionContext
executionContext, final int maxConnectionsSizePerQuery) throws SQLException {
- Collection<ShardingSphereRule> rules =
ProxyContext.getInstance().getMetaData(backendConnection.getSchemaName()).getRuleMetaData().getRules();
- Collection<ExecutionGroup<RawSQLExecutionUnit>> executionGroups = new
RawExecutionPrepareEngine(maxConnectionsSizePerQuery,
rules).prepare(executionContext.getRouteContext(),
- executionContext.getExecutionUnits());
- // TODO handle query header
- return rawExecutor.execute(executionGroups, new
RawSQLExecutorCallback());
+ private BackendResponse processExecuteQuery(final ExecutionContext
executionContext, final Collection<ExecuteResult> executeResults, final
QueryResult executeResultSample) throws SQLException {
+ ShardingSphereMetaData metaData =
ProxyContext.getInstance().getMetaData(backendConnection.getSchemaName());
+ int columnCount = executeResultSample.getMetaData().getColumnCount();
+ List<QueryHeader> queryHeaders = new ArrayList<>(columnCount);
+ for (int columnIndex = 1; columnIndex <= columnCount; columnIndex++) {
+ if
(hasSelectExpandProjections(executionContext.getSqlStatementContext())) {
+ queryHeaders.add(QueryHeaderBuilder.build(
+ ((SelectStatementContext)
executionContext.getSqlStatementContext()).getProjectionsContext(),
executeResultSample, metaData, columnIndex));
+ } else {
+ queryHeaders.add(QueryHeaderBuilder.build(executeResultSample,
metaData, columnIndex));
+ }
+ }
+ return getQueryResponses(queryHeaders, executeResults);
}
- private BackendResponse getExecuteQueryResponse(final List<QueryHeader>
queryHeaders, final Collection<ExecuteResult> executeResults) {
+ private BackendResponse getQueryResponses(final List<QueryHeader>
queryHeaders, final Collection<ExecuteResult> executeResults) {
QueryResponse result = new QueryResponse(queryHeaders);
for (ExecuteResult each : executeResults) {
result.getQueryResults().add((QueryResult) each);
@@ -191,4 +184,16 @@ public final class ProxySQLExecutor {
private boolean hasSelectExpandProjections(final SQLStatementContext<?>
sqlStatementContext) {
return sqlStatementContext instanceof SelectStatementContext &&
!((SelectStatementContext)
sqlStatementContext).getProjectionsContext().getExpandProjections().isEmpty();
}
+
+ private UpdateResponse processExecuteUpdate(final ExecutionContext
executionContext, final Collection<ExecuteResult> executeResults) {
+ UpdateResponse result = new UpdateResponse(executeResults);
+ if (executionContext.getSqlStatementContext().getSqlStatement()
instanceof InsertStatement) {
+ result.setType("INSERT");
+ } else if (executionContext.getSqlStatementContext().getSqlStatement()
instanceof DeleteStatement) {
+ result.setType("DELETE");
+ } else if (executionContext.getSqlStatementContext().getSqlStatement()
instanceof UpdateStatement) {
+ result.setType("UPDATE");
+ }
+ return result;
+ }
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/RawProxyExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/raw/ProxyRawExecutor.java
similarity index 84%
rename from
shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/RawProxyExecutor.java
rename to
shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/raw/ProxyRawExecutor.java
index c1dcb98..b4e152e 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/RawProxyExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/raw/ProxyRawExecutor.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.proxy.backend.communication.jdbc.executor;
+package org.apache.shardingsphere.proxy.backend.communication.raw;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
@@ -32,10 +32,10 @@ import java.util.Collections;
import java.util.List;
/**
- * Raw Proxy executor.
+ * Proxy raw executor.
*/
@RequiredArgsConstructor
-public final class RawProxyExecutor {
+public final class ProxyRawExecutor {
private final ExecutorEngine executorEngine;
@@ -46,12 +46,12 @@ public final class RawProxyExecutor {
*
* @param executionGroups execution groups
* @param callback raw SQL execute callback
- * @return return true if is DQL, false if is DML
+ * @return execute results
* @throws SQLException SQL exception
*/
public Collection<ExecuteResult> execute(final
Collection<ExecutionGroup<RawSQLExecutionUnit>> executionGroups, final
RawSQLExecutorCallback callback) throws SQLException {
// TODO Load query header for first query
- List<ExecuteResult> results = doExecute(executionGroups, null,
callback);
+ List<ExecuteResult> results = execute(executionGroups, null, callback);
// TODO refresh metadata
if (null == results || results.isEmpty() || null == results.get(0)) {
return Collections.singleton(new UpdateResult(0, 0L));
@@ -65,8 +65,8 @@ public final class RawProxyExecutor {
}
@SuppressWarnings("unchecked")
- private <T> List<T> doExecute(final
Collection<ExecutionGroup<RawSQLExecutionUnit>> executionGroups,
- final RawSQLExecutorCallback firstCallback,
final RawSQLExecutorCallback callback) throws SQLException {
+ private <T> List<T> execute(final
Collection<ExecutionGroup<RawSQLExecutionUnit>> executionGroups,
+ final RawSQLExecutorCallback firstCallback,
final RawSQLExecutorCallback callback) throws SQLException {
try {
return executorEngine.execute((Collection) executionGroups,
firstCallback, callback, serial);
} catch (final SQLException ex) {