This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 5259295fb8a Refactor DatabaseConnector (#32442)
5259295fb8a is described below
commit 5259295fb8a1310ed0c7f9ee61dbf5c6dcf96b07
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Aug 9 17:41:03 2024 +0800
Refactor DatabaseConnector (#32442)
* Refactor DatabaseConnector
* Refactor DatabaseConnector
---
.../proxy/backend/connector/DatabaseConnector.java | 114 ++++++++++-----------
1 file changed, 56 insertions(+), 58 deletions(-)
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java
index 296ae5d9510..3f0fcb87f9a 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.proxy.backend.connector;
-import com.google.common.base.Preconditions;
import org.apache.shardingsphere.infra.binder.context.aware.CursorAware;
import
org.apache.shardingsphere.infra.binder.context.segment.insert.keygen.GeneratedKeyContext;
import
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
@@ -50,6 +49,7 @@ import
org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import
org.apache.shardingsphere.infra.metadata.database.schema.util.SystemSchemaUtils;
import
org.apache.shardingsphere.infra.rule.attribute.datanode.DataNodeRuleAttribute;
+import
org.apache.shardingsphere.infra.session.connection.cursor.CursorConnectionContext;
import org.apache.shardingsphere.infra.session.query.QueryContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
@@ -67,7 +67,6 @@ import
org.apache.shardingsphere.proxy.backend.response.header.query.QueryHeader
import
org.apache.shardingsphere.proxy.backend.response.header.query.QueryHeaderBuilderEngine;
import
org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
import
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
-import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import
org.apache.shardingsphere.proxy.backend.session.transaction.TransactionStatus;
import org.apache.shardingsphere.proxy.backend.util.TransactionUtils;
import
org.apache.shardingsphere.sharding.merge.common.IteratorStreamMergedResult;
@@ -97,43 +96,72 @@ import java.util.stream.Collectors;
*/
public final class DatabaseConnector implements DatabaseBackendHandler {
- private final ProxySQLExecutor proxySQLExecutor;
+ private final String driverType;
- private final Collection<Statement> cachedStatements =
Collections.newSetFromMap(new ConcurrentHashMap<>());
+ private final QueryContext queryContext;
- private final Collection<ResultSet> cachedResultSets =
Collections.newSetFromMap(new ConcurrentHashMap<>());
+ private final ProxyDatabaseConnectionManager databaseConnectionManager;
- private final String driverType;
+ private final ContextManager contextManager;
+
+ private final ShardingSphereDatabase database;
private final boolean containsDerivedProjections;
- private final QueryContext queryContext;
+ private final ProxySQLExecutor proxySQLExecutor;
- private final ProxyDatabaseConnectionManager databaseConnectionManager;
+ private final Collection<Statement> cachedStatements =
Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+ private final Collection<ResultSet> cachedResultSets =
Collections.newSetFromMap(new ConcurrentHashMap<>());
private List<QueryHeader> queryHeaders;
private MergedResult mergedResult;
public DatabaseConnector(final String driverType, final QueryContext
queryContext, final ProxyDatabaseConnectionManager databaseConnectionManager) {
- SQLStatementContext sqlStatementContext =
queryContext.getSqlStatementContext();
-
failedIfBackendNotReady(databaseConnectionManager.getConnectionSession(),
sqlStatementContext);
this.driverType = driverType;
this.queryContext = queryContext;
- containsDerivedProjections = sqlStatementContext instanceof
SelectStatementContext && ((SelectStatementContext)
sqlStatementContext).containsDerivedProjections();
this.databaseConnectionManager = databaseConnectionManager;
+ contextManager = ProxyContext.getInstance().getContextManager();
+ database =
contextManager.getDatabase(databaseConnectionManager.getConnectionSession().getUsedDatabaseName());
+ SQLStatementContext sqlStatementContext =
queryContext.getSqlStatementContext();
+ checkBackendReady(sqlStatementContext);
+ containsDerivedProjections = sqlStatementContext instanceof
SelectStatementContext && ((SelectStatementContext)
sqlStatementContext).containsDerivedProjections();
if (sqlStatementContext instanceof CursorAvailable) {
- prepareCursorStatementContext((CursorAvailable)
sqlStatementContext, databaseConnectionManager.getConnectionSession());
+ prepareCursorStatementContext((CursorAvailable)
sqlStatementContext);
}
proxySQLExecutor = new ProxySQLExecutor(driverType,
databaseConnectionManager, this, queryContext);
}
- private void failedIfBackendNotReady(final ConnectionSession
connectionSession, final SQLStatementContext sqlStatementContext) {
- ShardingSphereDatabase database =
ProxyContext.getInstance().getContextManager().getDatabase(connectionSession.getUsedDatabaseName());
+ private void checkBackendReady(final SQLStatementContext
sqlStatementContext) {
boolean isSystemSchema =
SystemSchemaUtils.containsSystemSchema(sqlStatementContext.getDatabaseType(),
sqlStatementContext instanceof TableAvailable ?
((TableAvailable) sqlStatementContext).getTablesContext().getSchemaNames() :
Collections.emptyList(), database);
- ShardingSpherePreconditions.checkState(isSystemSchema ||
database.containsDataSource(), () -> new
EmptyStorageUnitException(connectionSession.getUsedDatabaseName()));
- ShardingSpherePreconditions.checkState(isSystemSchema ||
database.isComplete(), () -> new
EmptyRuleException(connectionSession.getUsedDatabaseName()));
+ ShardingSpherePreconditions.checkState(isSystemSchema ||
database.containsDataSource(), () -> new
EmptyStorageUnitException(database.getName()));
+ ShardingSpherePreconditions.checkState(isSystemSchema ||
database.isComplete(), () -> new EmptyRuleException(database.getName()));
+ }
+
+ private void prepareCursorStatementContext(final CursorAvailable
statementContext) {
+ if (statementContext.getCursorName().isPresent()) {
+ prepareCursorStatementContext(statementContext,
statementContext.getCursorName().get().getIdentifier().getValue().toLowerCase());
+ }
+ if (statementContext instanceof CloseStatementContext &&
((CloseStatementContext) statementContext).getSqlStatement().isCloseAll()) {
+
databaseConnectionManager.getConnectionSession().getConnectionContext().clearCursorContext();
+ }
+ }
+
+ private void prepareCursorStatementContext(final CursorAvailable
statementContext, final String cursorName) {
+ CursorConnectionContext cursorContext =
databaseConnectionManager.getConnectionSession().getConnectionContext().getCursorContext();
+ if (statementContext instanceof CursorStatementContext) {
+ cursorContext.getCursorStatementContexts().put(cursorName,
(CursorStatementContext) statementContext);
+ }
+ if (statementContext instanceof CursorAware) {
+ ShardingSpherePreconditions.checkContainsKey(
+ cursorContext.getCursorStatementContexts(), cursorName, ()
-> new IllegalArgumentException(String.format("Cursor %s does not exist.",
cursorName)));
+ ((CursorAware)
statementContext).setCursorStatementContext(cursorContext.getCursorStatementContexts().get(cursorName));
+ }
+ if (statementContext instanceof CloseStatementContext) {
+ cursorContext.removeCursor(cursorName);
+ }
}
/**
@@ -156,9 +184,8 @@ public final class DatabaseConnector implements
DatabaseBackendHandler {
@Override
public ResponseHeader execute() throws SQLException {
- MetaDataContexts metaDataContexts =
ProxyContext.getInstance().getContextManager().getMetaDataContexts();
- if (proxySQLExecutor.getSqlFederationEngine().decide(queryContext,
metaDataContexts.getMetaData().getGlobalRuleMetaData())) {
- return
processExecuteFederation(doExecuteFederation(metaDataContexts),
metaDataContexts);
+ if (proxySQLExecutor.getSqlFederationEngine().decide(queryContext,
contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData())) {
+ return processExecuteFederation(doExecuteFederation());
}
MetaDataRefreshEngine metaDataRefreshEngine =
getMetaDataRefreshEngine();
if (proxySQLExecutor.getSqlFederationEngine().enabled() &&
metaDataRefreshEngine.isFederation(queryContext.getSqlStatementContext())) {
@@ -172,7 +199,7 @@ public final class DatabaseConnector implements
DatabaseBackendHandler {
}
private ExecutionContext generateExecutionContext() {
- ShardingSphereMetaData metaData =
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData();
+ ShardingSphereMetaData metaData =
contextManager.getMetaDataContexts().getMetaData();
return new KernelProcessor().generateExecutionContext(
queryContext, metaData.getGlobalRuleMetaData(),
metaData.getProps(),
databaseConnectionManager.getConnectionSession().getConnectionContext());
}
@@ -201,8 +228,7 @@ public final class DatabaseConnector implements
DatabaseBackendHandler {
} catch (final Exception ex) {
// CHECKSTYLE:ON
transactionManager.rollback();
- String databaseName =
databaseConnectionManager.getConnectionSession().getUsedDatabaseName();
- throw SQLExceptionTransformEngine.toSQLException(ex,
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase(databaseName).getProtocolType());
+ throw SQLExceptionTransformEngine.toSQLException(ex,
database.getProtocolType());
}
return result;
}
@@ -220,14 +246,14 @@ public final class DatabaseConnector implements
DatabaseBackendHandler {
: processExecuteUpdate(result.stream().map(each ->
(UpdateResult) each).collect(Collectors.toList()));
}
- private ResultSet doExecuteFederation(final MetaDataContexts
metaDataContexts) {
+ private ResultSet doExecuteFederation() {
boolean isReturnGeneratedKeys =
queryContext.getSqlStatementContext().getSqlStatement() instanceof
MySQLInsertStatement;
- ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseConnectionManager.getConnectionSession().getUsedDatabaseName());
DatabaseType protocolType = database.getProtocolType();
ProxyJDBCExecutorCallback callback =
ProxyJDBCExecutorCallbackFactory.newInstance(driverType, protocolType,
database.getResourceMetaData(),
queryContext.getSqlStatementContext().getSqlStatement(), this,
isReturnGeneratedKeys, SQLExecutorExceptionHandler.isExceptionThrown(), true);
- DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = createDriverExecutionPrepareEngine(isReturnGeneratedKeys,
metaDataContexts);
- SQLFederationContext context = new SQLFederationContext(false,
queryContext, metaDataContexts.getMetaData(),
databaseConnectionManager.getConnectionSession().getProcessId());
+ DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = createDriverExecutionPrepareEngine(isReturnGeneratedKeys,
contextManager.getMetaDataContexts());
+ SQLFederationContext context = new SQLFederationContext(
+ false, queryContext,
contextManager.getMetaDataContexts().getMetaData(),
databaseConnectionManager.getConnectionSession().getProcessId());
return
proxySQLExecutor.getSqlFederationEngine().executeQuery(prepareEngine, callback,
context);
}
@@ -235,14 +261,12 @@ public final class DatabaseConnector implements
DatabaseBackendHandler {
int maxConnectionsSizePerQuery =
metaData.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
JDBCBackendStatement statementManager = (JDBCBackendStatement)
databaseConnectionManager.getConnectionSession().getStatementManager();
return new DriverExecutionPrepareEngine<>(driverType,
maxConnectionsSizePerQuery, databaseConnectionManager, statementManager,
- new StatementOption(isReturnGeneratedKeys),
metaData.getMetaData().getDatabase(databaseConnectionManager.getConnectionSession().getUsedDatabaseName()).getRuleMetaData().getRules(),
-
metaData.getMetaData().getDatabase(databaseConnectionManager.getConnectionSession().getUsedDatabaseName()).getResourceMetaData().getStorageUnits());
+ new StatementOption(isReturnGeneratedKeys),
database.getRuleMetaData().getRules(),
database.getResourceMetaData().getStorageUnits());
}
- private ResponseHeader processExecuteFederation(final ResultSet resultSet,
final MetaDataContexts metaDataContexts) throws SQLException {
+ private ResponseHeader processExecuteFederation(final ResultSet resultSet)
throws SQLException {
int columnCount = resultSet.getMetaData().getColumnCount();
queryHeaders = new ArrayList<>(columnCount);
- ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseConnectionManager.getConnectionSession().getUsedDatabaseName());
QueryHeaderBuilderEngine queryHeaderBuilderEngine = new
QueryHeaderBuilderEngine(null == database ? null : database.getProtocolType());
for (int columnIndex = 1; columnIndex <= columnCount; columnIndex++) {
queryHeaders.add(queryHeaderBuilderEngine.build(new
JDBCQueryResultMetaData(resultSet.getMetaData()), database, columnIndex));
@@ -251,36 +275,11 @@ public final class DatabaseConnector implements
DatabaseBackendHandler {
return new QueryResponseHeader(queryHeaders);
}
- private void prepareCursorStatementContext(final CursorAvailable
statementContext, final ConnectionSession connectionSession) {
- if (statementContext.getCursorName().isPresent()) {
- String cursorName =
statementContext.getCursorName().get().getIdentifier().getValue().toLowerCase();
- prepareCursorStatementContext(statementContext, connectionSession,
cursorName);
- }
- if (statementContext instanceof CloseStatementContext &&
((CloseStatementContext) statementContext).getSqlStatement().isCloseAll()) {
- connectionSession.getConnectionContext().clearCursorContext();
- }
- }
-
- private void prepareCursorStatementContext(final CursorAvailable
statementContext, final ConnectionSession connectionSession, final String
cursorName) {
- if (statementContext instanceof CursorStatementContext) {
-
connectionSession.getConnectionContext().getCursorContext().getCursorStatementContexts().put(cursorName,
(CursorStatementContext) statementContext);
- }
- if (statementContext instanceof CursorAware) {
- CursorStatementContext cursorStatementContext =
connectionSession.getConnectionContext().getCursorContext().getCursorStatementContexts().get(cursorName);
- Preconditions.checkArgument(null != cursorStatementContext,
"Cursor %s does not exist.", cursorName);
- ((CursorAware)
statementContext).setCursorStatementContext(cursorStatementContext);
- }
- if (statementContext instanceof CloseStatementContext) {
-
connectionSession.getConnectionContext().getCursorContext().removeCursor(cursorName);
- }
- }
-
private void refreshMetaData(final ExecutionContext executionContext)
throws SQLException {
getMetaDataRefreshEngine().refresh(queryContext.getSqlStatementContext(),
executionContext.getRouteContext().getRouteUnits());
}
private MetaDataRefreshEngine getMetaDataRefreshEngine() {
- ContextManager contextManager =
ProxyContext.getInstance().getContextManager();
return new MetaDataRefreshEngine(
contextManager.getPersistServiceFacade().getMetaDataManagerPersistService(),
queryContext.getUsedDatabase(),
contextManager.getMetaDataContexts().getMetaData().getProps());
}
@@ -316,9 +315,8 @@ public final class DatabaseConnector implements
DatabaseBackendHandler {
}
private MergedResult mergeQuery(final SQLStatementContext
sqlStatementContext, final List<QueryResult> queryResults) throws SQLException {
- ShardingSphereMetaData metaData =
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData();
- MergeEngine mergeEngine = new MergeEngine(
- metaData.getGlobalRuleMetaData(),
queryContext.getUsedDatabase(), metaData.getProps(),
databaseConnectionManager.getConnectionSession().getConnectionContext());
+ MergeEngine mergeEngine = new
MergeEngine(contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData(),
+ queryContext.getUsedDatabase(),
contextManager.getMetaDataContexts().getMetaData().getProps(),
databaseConnectionManager.getConnectionSession().getConnectionContext());
return mergeEngine.merge(queryResults, sqlStatementContext);
}