This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a78e7c08539 Refactor PreviewExecutor (#29606)
a78e7c08539 is described below
commit a78e7c085399e510bbe7a14c06c2d51d6d985bd8
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 31 17:06:59 2023 +0800
Refactor PreviewExecutor (#29606)
---
.../sqlfederation/engine/SQLFederationEngine.java | 3 +-
.../handler/distsql/rul/sql/PreviewExecutor.java | 97 +++++++++-------------
2 files changed, 40 insertions(+), 60 deletions(-)
diff --git
a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java
b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java
index 1e0cdfd114b..77e6073c33f 100644
---
a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java
+++
b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java
@@ -114,8 +114,7 @@ public final class SQLFederationEngine implements
AutoCloseable {
* @return use SQL federation or not
*/
@SuppressWarnings({"unchecked", "rawtypes"})
- public boolean decide(final SQLStatementContext sqlStatementContext, final
List<Object> parameters,
- final ShardingSphereDatabase database, final
RuleMetaData globalRuleMetaData) {
+ public boolean decide(final SQLStatementContext sqlStatementContext, final
List<Object> parameters, final ShardingSphereDatabase database, final
RuleMetaData globalRuleMetaData) {
// TODO BEGIN: move this logic to SQLFederationDecider implement class
when we remove sql federation type
if (isQuerySystemSchema(sqlStatementContext, database)) {
return true;
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/sql/PreviewExecutor.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/sql/PreviewExecutor.java
index 8d403e5da85..b73ccf3cea3 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/sql/PreviewExecutor.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/sql/PreviewExecutor.java
@@ -50,9 +50,7 @@ import
org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryRes
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import
org.apache.shardingsphere.infra.metadata.database.resource.ResourceMetaData;
-import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
import org.apache.shardingsphere.infra.session.query.QueryContext;
-import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.parser.rule.SQLParserRule;
import
org.apache.shardingsphere.proxy.backend.connector.jdbc.statement.JDBCBackendStatement;
import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
@@ -88,66 +86,57 @@ public final class PreviewExecutor implements
ConnectionSessionRequiredRULExecut
@Override
public Collection<LocalDataQueryResultRow> getRows(final
ShardingSphereMetaData metaData, final ConnectionSession connectionSession,
final PreviewStatement sqlStatement) throws SQLException {
- MetaDataContexts metaDataContexts =
ProxyContext.getInstance().getContextManager().getMetaDataContexts();
- String databaseName = getDatabaseName(connectionSession);
- RuleMetaData globalRuleMetaData =
metaDataContexts.getMetaData().getGlobalRuleMetaData();
- SQLParserRule sqlParserRule =
globalRuleMetaData.getSingleRule(SQLParserRule.class);
- String sql = SQLHintUtils.removeHint(sqlStatement.getSql());
+ ShardingSphereDatabase database =
ProxyContext.getInstance().getDatabase(getDatabaseName(connectionSession));
+ String toBePreviewedSQL =
SQLHintUtils.removeHint(sqlStatement.getSql());
HintValueContext hintValueContext =
SQLHintUtils.extractHint(sqlStatement.getSql()).orElseGet(HintValueContext::new);
- DatabaseType databaseType =
metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType();
- SQLStatement previewedStatement =
sqlParserRule.getSQLParserEngine(databaseType).parse(sql, false);
- SQLStatementContext sqlStatementContext = new
SQLBindEngine(metaDataContexts.getMetaData(), databaseName,
hintValueContext).bind(previewedStatement, Collections.emptyList());
- QueryContext queryContext = new QueryContext(sqlStatementContext, sql,
Collections.emptyList(), hintValueContext);
+ SQLStatement toBePreviewedStatement =
metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class).getSQLParserEngine(database.getProtocolType()).parse(toBePreviewedSQL,
false);
+ SQLStatementContext toBePreviewedStatementContext = new
SQLBindEngine(metaData, database.getName(),
hintValueContext).bind(toBePreviewedStatement, Collections.emptyList());
+ QueryContext queryContext = new
QueryContext(toBePreviewedStatementContext, toBePreviewedSQL,
Collections.emptyList(), hintValueContext);
connectionSession.setQueryContext(queryContext);
- if (sqlStatementContext instanceof CursorAvailable &&
sqlStatementContext instanceof CursorDefinitionAware) {
- setUpCursorDefinition(sqlStatementContext, connectionSession);
+ if (toBePreviewedStatementContext instanceof CursorAvailable &&
toBePreviewedStatementContext instanceof CursorDefinitionAware) {
+ setUpCursorDefinition(connectionSession,
toBePreviewedStatementContext);
}
- ShardingSphereDatabase database =
ProxyContext.getInstance().getDatabase(connectionSession.getDatabaseName());
ShardingSpherePreconditions.checkState(database.isComplete(), () ->
new RuleNotExistedException(connectionSession.getDatabaseName()));
- ConfigurationProperties props =
metaDataContexts.getMetaData().getProps();
String schemaName =
queryContext.getSqlStatementContext().getTablesContext().getSchemaName()
- .orElseGet(() -> new
DatabaseTypeRegistry(database.getProtocolType()).getDefaultSchemaName(databaseName));
- SQLFederationEngine sqlFederationEngine = new
SQLFederationEngine(databaseName, schemaName, metaDataContexts.getMetaData(),
metaDataContexts.getStatistics(),
+ .orElseGet(() -> new
DatabaseTypeRegistry(database.getProtocolType()).getDefaultSchemaName(database.getName()));
+ SQLFederationEngine federationEngine = new
SQLFederationEngine(database.getName(), schemaName, metaData,
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getStatistics(),
new
JDBCExecutor(BackendExecutorContext.getInstance().getExecutorEngine(),
connectionSession.getConnectionContext()));
- Collection<ExecutionUnit> executionUnits =
isUseFederation(queryContext, metaDataContexts, connectionSession,
sqlFederationEngine)
- ? getFederationExecutionUnits(queryContext, metaDataContexts,
connectionSession, sqlFederationEngine)
- : kernelProcessor.generateExecutionContext(queryContext,
database, globalRuleMetaData, props,
connectionSession.getConnectionContext()).getExecutionUnits();
- return
executionUnits.stream().map(this::buildRow).collect(Collectors.toList());
+ Collection<ExecutionUnit> executionUnits =
federationEngine.decide(queryContext.getSqlStatementContext(),
queryContext.getParameters(), database, metaData.getGlobalRuleMetaData())
+ ? getFederationExecutionUnits(queryContext, metaData,
database, connectionSession, federationEngine)
+ : kernelProcessor.generateExecutionContext(
+ queryContext, database,
metaData.getGlobalRuleMetaData(), metaData.getProps(),
connectionSession.getConnectionContext()).getExecutionUnits();
+ return executionUnits.stream().map(each -> new
LocalDataQueryResultRow(each.getDataSourceName(),
each.getSqlUnit().getSql())).collect(Collectors.toList());
}
- private void setUpCursorDefinition(final SQLStatementContext
sqlStatementContext, final ConnectionSession connectionSession) {
- if (!((CursorAvailable)
sqlStatementContext).getCursorName().isPresent()) {
+ private String getDatabaseName(final ConnectionSession connectionSession) {
+ String result =
Strings.isNullOrEmpty(connectionSession.getDatabaseName()) ?
connectionSession.getDefaultDatabaseName() :
connectionSession.getDatabaseName();
+ ShardingSpherePreconditions.checkState(!Strings.isNullOrEmpty(result),
NoDatabaseSelectedException::new);
+
ShardingSpherePreconditions.checkState(ProxyContext.getInstance().databaseExists(result),
() -> new UnknownDatabaseException(result));
+ return result;
+ }
+
+ private void setUpCursorDefinition(final ConnectionSession
connectionSession, final SQLStatementContext toBePreviewedStatementContext) {
+ if (!((CursorAvailable)
toBePreviewedStatementContext).getCursorName().isPresent()) {
return;
}
- String cursorName = ((CursorAvailable)
sqlStatementContext).getCursorName().get().getIdentifier().getValue().toLowerCase();
+ String cursorName = ((CursorAvailable)
toBePreviewedStatementContext).getCursorName().get().getIdentifier().getValue().toLowerCase();
CursorStatementContext cursorStatementContext =
(CursorStatementContext)
connectionSession.getConnectionContext().getCursorContext().getCursorDefinitions().get(cursorName);
- Preconditions.checkArgument(null != cursorStatementContext, "Cursor %s
does not exist.", cursorName);
- ((CursorDefinitionAware)
sqlStatementContext).setUpCursorDefinition(cursorStatementContext);
- }
-
- private boolean isUseFederation(final QueryContext queryContext, final
MetaDataContexts metaDataContexts, final ConnectionSession connectionSession,
- final SQLFederationEngine
sqlFederationEngine) {
- return
sqlFederationEngine.decide(queryContext.getSqlStatementContext(),
queryContext.getParameters(),
-
metaDataContexts.getMetaData().getDatabase(connectionSession.getDatabaseName()),
metaDataContexts.getMetaData().getGlobalRuleMetaData());
+ Preconditions.checkNotNull(cursorStatementContext, "Cursor %s does not
exist.", cursorName);
+ ((CursorDefinitionAware)
toBePreviewedStatementContext).setUpCursorDefinition(cursorStatementContext);
}
- private LocalDataQueryResultRow buildRow(final ExecutionUnit unit) {
- return new LocalDataQueryResultRow(unit.getDataSourceName(),
unit.getSqlUnit().getSql());
- }
-
- private Collection<ExecutionUnit> getFederationExecutionUnits(final
QueryContext queryContext, final MetaDataContexts metaDataContexts,
- final
ConnectionSession connectionSession, final SQLFederationEngine
sqlFederationEngine) {
+ private Collection<ExecutionUnit> getFederationExecutionUnits(final
QueryContext queryContext, final ShardingSphereMetaData metaData, final
ShardingSphereDatabase database,
+ final
ConnectionSession connectionSession, final SQLFederationEngine
federationEngine) {
SQLStatement sqlStatement =
queryContext.getSqlStatementContext().getSqlStatement();
+ // TODO move dialect MySQLInsertStatement into database type module
@zhangliang
boolean isReturnGeneratedKeys = sqlStatement instanceof
MySQLInsertStatement;
- DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = createDriverExecutionPrepareEngine(isReturnGeneratedKeys,
metaDataContexts, connectionSession);
- SQLFederationExecutorContext context = new
SQLFederationExecutorContext(true, queryContext,
metaDataContexts.getMetaData());
- ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(getDatabaseName(connectionSession));
- sqlFederationEngine.executeQuery(prepareEngine,
createPreviewFederationCallback(database.getProtocolType(),
database.getResourceMetaData(), sqlStatement), context);
+ DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = createDriverExecutionPrepareEngine(isReturnGeneratedKeys,
metaData.getProps(), database, connectionSession);
+ SQLFederationExecutorContext context = new
SQLFederationExecutorContext(true, queryContext, metaData);
+ federationEngine.executeQuery(prepareEngine,
createPreviewFederationCallback(database.getProtocolType(),
database.getResourceMetaData(), sqlStatement), context);
return context.getExecutionUnits();
}
- private JDBCExecutorCallback<ExecuteResult>
createPreviewFederationCallback(final DatabaseType protocolType, final
ResourceMetaData resourceMetaData,
-
final SQLStatement sqlStatement) {
+ private JDBCExecutorCallback<ExecuteResult>
createPreviewFederationCallback(final DatabaseType protocolType, final
ResourceMetaData resourceMetaData, final SQLStatement sqlStatement) {
return new JDBCExecutorCallback<ExecuteResult>(protocolType,
resourceMetaData, sqlStatement,
SQLExecutorExceptionHandler.isExceptionThrown()) {
@Override
@@ -162,20 +151,12 @@ public final class PreviewExecutor implements
ConnectionSessionRequiredRULExecut
};
}
- private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
createDriverExecutionPrepareEngine(final boolean isReturnGeneratedKeys, final
MetaDataContexts metaDataContexts,
-
final ConnectionSession connectionSession) {
- int maxConnectionsSizePerQuery =
metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
- return new DriverExecutionPrepareEngine<>(JDBCDriverType.STATEMENT,
maxConnectionsSizePerQuery, connectionSession.getDatabaseConnectionManager(),
- (JDBCBackendStatement)
connectionSession.getStatementManager(), new
StatementOption(isReturnGeneratedKeys),
-
metaDataContexts.getMetaData().getDatabase(getDatabaseName(connectionSession)).getRuleMetaData().getRules(),
-
metaDataContexts.getMetaData().getDatabase(getDatabaseName(connectionSession)).getResourceMetaData().getStorageUnits());
- }
-
- private String getDatabaseName(final ConnectionSession connectionSession) {
- String result =
Strings.isNullOrEmpty(connectionSession.getDatabaseName()) ?
connectionSession.getDefaultDatabaseName() :
connectionSession.getDatabaseName();
- ShardingSpherePreconditions.checkState(!Strings.isNullOrEmpty(result),
NoDatabaseSelectedException::new);
-
ShardingSpherePreconditions.checkState(ProxyContext.getInstance().databaseExists(result),
() -> new UnknownDatabaseException(result));
- return result;
+ private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
createDriverExecutionPrepareEngine(final boolean isReturnGeneratedKeys, final
ConfigurationProperties props,
+
final ShardingSphereDatabase database, final
ConnectionSession connectionSession) {
+ int maxConnectionsSizePerQuery =
props.<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
+ return new DriverExecutionPrepareEngine<>(JDBCDriverType.STATEMENT,
maxConnectionsSizePerQuery,
+ connectionSession.getDatabaseConnectionManager(),
(JDBCBackendStatement) connectionSession.getStatementManager(),
+ new StatementOption(isReturnGeneratedKeys),
database.getRuleMetaData().getRules(),
database.getResourceMetaData().getStorageUnits());
}
@Override