This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 5259295fb8a Refactor DatabaseConnector (#32442)
5259295fb8a is described below

commit 5259295fb8a1310ed0c7f9ee61dbf5c6dcf96b07
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Aug 9 17:41:03 2024 +0800

    Refactor DatabaseConnector (#32442)
    
    * Refactor DatabaseConnector
    
    * Refactor DatabaseConnector
---
 .../proxy/backend/connector/DatabaseConnector.java | 114 ++++++++++-----------
 1 file changed, 56 insertions(+), 58 deletions(-)

diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java
 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java
index 296ae5d9510..3f0fcb87f9a 100644
--- 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java
+++ 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.proxy.backend.connector;
 
-import com.google.common.base.Preconditions;
 import org.apache.shardingsphere.infra.binder.context.aware.CursorAware;
 import 
org.apache.shardingsphere.infra.binder.context.segment.insert.keygen.GeneratedKeyContext;
 import 
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
@@ -50,6 +49,7 @@ import 
org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.util.SystemSchemaUtils;
 import 
org.apache.shardingsphere.infra.rule.attribute.datanode.DataNodeRuleAttribute;
+import 
org.apache.shardingsphere.infra.session.connection.cursor.CursorConnectionContext;
 import org.apache.shardingsphere.infra.session.query.QueryContext;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
@@ -67,7 +67,6 @@ import 
org.apache.shardingsphere.proxy.backend.response.header.query.QueryHeader
 import 
org.apache.shardingsphere.proxy.backend.response.header.query.QueryHeaderBuilderEngine;
 import 
org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
 import 
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
-import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
 import 
org.apache.shardingsphere.proxy.backend.session.transaction.TransactionStatus;
 import org.apache.shardingsphere.proxy.backend.util.TransactionUtils;
 import 
org.apache.shardingsphere.sharding.merge.common.IteratorStreamMergedResult;
@@ -97,43 +96,72 @@ import java.util.stream.Collectors;
  */
 public final class DatabaseConnector implements DatabaseBackendHandler {
     
-    private final ProxySQLExecutor proxySQLExecutor;
+    private final String driverType;
     
-    private final Collection<Statement> cachedStatements = 
Collections.newSetFromMap(new ConcurrentHashMap<>());
+    private final QueryContext queryContext;
     
-    private final Collection<ResultSet> cachedResultSets = 
Collections.newSetFromMap(new ConcurrentHashMap<>());
+    private final ProxyDatabaseConnectionManager databaseConnectionManager;
     
-    private final String driverType;
+    private final ContextManager contextManager;
+    
+    private final ShardingSphereDatabase database;
     
     private final boolean containsDerivedProjections;
     
-    private final QueryContext queryContext;
+    private final ProxySQLExecutor proxySQLExecutor;
     
-    private final ProxyDatabaseConnectionManager databaseConnectionManager;
+    private final Collection<Statement> cachedStatements = 
Collections.newSetFromMap(new ConcurrentHashMap<>());
+    
+    private final Collection<ResultSet> cachedResultSets = 
Collections.newSetFromMap(new ConcurrentHashMap<>());
     
     private List<QueryHeader> queryHeaders;
     
     private MergedResult mergedResult;
     
     public DatabaseConnector(final String driverType, final QueryContext 
queryContext, final ProxyDatabaseConnectionManager databaseConnectionManager) {
-        SQLStatementContext sqlStatementContext = 
queryContext.getSqlStatementContext();
-        
failedIfBackendNotReady(databaseConnectionManager.getConnectionSession(), 
sqlStatementContext);
         this.driverType = driverType;
         this.queryContext = queryContext;
-        containsDerivedProjections = sqlStatementContext instanceof 
SelectStatementContext && ((SelectStatementContext) 
sqlStatementContext).containsDerivedProjections();
         this.databaseConnectionManager = databaseConnectionManager;
+        contextManager = ProxyContext.getInstance().getContextManager();
+        database = 
contextManager.getDatabase(databaseConnectionManager.getConnectionSession().getUsedDatabaseName());
+        SQLStatementContext sqlStatementContext = 
queryContext.getSqlStatementContext();
+        checkBackendReady(sqlStatementContext);
+        containsDerivedProjections = sqlStatementContext instanceof 
SelectStatementContext && ((SelectStatementContext) 
sqlStatementContext).containsDerivedProjections();
         if (sqlStatementContext instanceof CursorAvailable) {
-            prepareCursorStatementContext((CursorAvailable) 
sqlStatementContext, databaseConnectionManager.getConnectionSession());
+            prepareCursorStatementContext((CursorAvailable) 
sqlStatementContext);
         }
         proxySQLExecutor = new ProxySQLExecutor(driverType, 
databaseConnectionManager, this, queryContext);
     }
     
-    private void failedIfBackendNotReady(final ConnectionSession 
connectionSession, final SQLStatementContext sqlStatementContext) {
-        ShardingSphereDatabase database = 
ProxyContext.getInstance().getContextManager().getDatabase(connectionSession.getUsedDatabaseName());
+    private void checkBackendReady(final SQLStatementContext 
sqlStatementContext) {
         boolean isSystemSchema = 
SystemSchemaUtils.containsSystemSchema(sqlStatementContext.getDatabaseType(),
                 sqlStatementContext instanceof TableAvailable ? 
((TableAvailable) sqlStatementContext).getTablesContext().getSchemaNames() : 
Collections.emptyList(), database);
-        ShardingSpherePreconditions.checkState(isSystemSchema || 
database.containsDataSource(), () -> new 
EmptyStorageUnitException(connectionSession.getUsedDatabaseName()));
-        ShardingSpherePreconditions.checkState(isSystemSchema || 
database.isComplete(), () -> new 
EmptyRuleException(connectionSession.getUsedDatabaseName()));
+        ShardingSpherePreconditions.checkState(isSystemSchema || 
database.containsDataSource(), () -> new 
EmptyStorageUnitException(database.getName()));
+        ShardingSpherePreconditions.checkState(isSystemSchema || 
database.isComplete(), () -> new EmptyRuleException(database.getName()));
+    }
+    
+    private void prepareCursorStatementContext(final CursorAvailable 
statementContext) {
+        if (statementContext.getCursorName().isPresent()) {
+            prepareCursorStatementContext(statementContext, 
statementContext.getCursorName().get().getIdentifier().getValue().toLowerCase());
+        }
+        if (statementContext instanceof CloseStatementContext && 
((CloseStatementContext) statementContext).getSqlStatement().isCloseAll()) {
+            
databaseConnectionManager.getConnectionSession().getConnectionContext().clearCursorContext();
+        }
+    }
+    
+    private void prepareCursorStatementContext(final CursorAvailable 
statementContext, final String cursorName) {
+        CursorConnectionContext cursorContext = 
databaseConnectionManager.getConnectionSession().getConnectionContext().getCursorContext();
+        if (statementContext instanceof CursorStatementContext) {
+            cursorContext.getCursorStatementContexts().put(cursorName, 
(CursorStatementContext) statementContext);
+        }
+        if (statementContext instanceof CursorAware) {
+            ShardingSpherePreconditions.checkContainsKey(
+                    cursorContext.getCursorStatementContexts(), cursorName, () 
-> new IllegalArgumentException(String.format("Cursor %s does not exist.", 
cursorName)));
+            ((CursorAware) 
statementContext).setCursorStatementContext(cursorContext.getCursorStatementContexts().get(cursorName));
+        }
+        if (statementContext instanceof CloseStatementContext) {
+            cursorContext.removeCursor(cursorName);
+        }
     }
     
     /**
@@ -156,9 +184,8 @@ public final class DatabaseConnector implements 
DatabaseBackendHandler {
     
     @Override
     public ResponseHeader execute() throws SQLException {
-        MetaDataContexts metaDataContexts = 
ProxyContext.getInstance().getContextManager().getMetaDataContexts();
-        if (proxySQLExecutor.getSqlFederationEngine().decide(queryContext, 
metaDataContexts.getMetaData().getGlobalRuleMetaData())) {
-            return 
processExecuteFederation(doExecuteFederation(metaDataContexts), 
metaDataContexts);
+        if (proxySQLExecutor.getSqlFederationEngine().decide(queryContext, 
contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData())) {
+            return processExecuteFederation(doExecuteFederation());
         }
         MetaDataRefreshEngine metaDataRefreshEngine = 
getMetaDataRefreshEngine();
         if (proxySQLExecutor.getSqlFederationEngine().enabled() && 
metaDataRefreshEngine.isFederation(queryContext.getSqlStatementContext())) {
@@ -172,7 +199,7 @@ public final class DatabaseConnector implements 
DatabaseBackendHandler {
     }
     
     private ExecutionContext generateExecutionContext() {
-        ShardingSphereMetaData metaData = 
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData();
+        ShardingSphereMetaData metaData = 
contextManager.getMetaDataContexts().getMetaData();
         return new KernelProcessor().generateExecutionContext(
                 queryContext, metaData.getGlobalRuleMetaData(), 
metaData.getProps(), 
databaseConnectionManager.getConnectionSession().getConnectionContext());
     }
@@ -201,8 +228,7 @@ public final class DatabaseConnector implements 
DatabaseBackendHandler {
         } catch (final Exception ex) {
             // CHECKSTYLE:ON
             transactionManager.rollback();
-            String databaseName = 
databaseConnectionManager.getConnectionSession().getUsedDatabaseName();
-            throw SQLExceptionTransformEngine.toSQLException(ex, 
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase(databaseName).getProtocolType());
+            throw SQLExceptionTransformEngine.toSQLException(ex, 
database.getProtocolType());
         }
         return result;
     }
@@ -220,14 +246,14 @@ public final class DatabaseConnector implements 
DatabaseBackendHandler {
                 : processExecuteUpdate(result.stream().map(each -> 
(UpdateResult) each).collect(Collectors.toList()));
     }
     
-    private ResultSet doExecuteFederation(final MetaDataContexts 
metaDataContexts) {
+    private ResultSet doExecuteFederation() {
         boolean isReturnGeneratedKeys = 
queryContext.getSqlStatementContext().getSqlStatement() instanceof 
MySQLInsertStatement;
-        ShardingSphereDatabase database = 
metaDataContexts.getMetaData().getDatabase(databaseConnectionManager.getConnectionSession().getUsedDatabaseName());
         DatabaseType protocolType = database.getProtocolType();
         ProxyJDBCExecutorCallback callback = 
ProxyJDBCExecutorCallbackFactory.newInstance(driverType, protocolType, 
database.getResourceMetaData(),
                 queryContext.getSqlStatementContext().getSqlStatement(), this, 
isReturnGeneratedKeys, SQLExecutorExceptionHandler.isExceptionThrown(), true);
-        DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
prepareEngine = createDriverExecutionPrepareEngine(isReturnGeneratedKeys, 
metaDataContexts);
-        SQLFederationContext context = new SQLFederationContext(false, 
queryContext, metaDataContexts.getMetaData(), 
databaseConnectionManager.getConnectionSession().getProcessId());
+        DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
prepareEngine = createDriverExecutionPrepareEngine(isReturnGeneratedKeys, 
contextManager.getMetaDataContexts());
+        SQLFederationContext context = new SQLFederationContext(
+                false, queryContext, 
contextManager.getMetaDataContexts().getMetaData(), 
databaseConnectionManager.getConnectionSession().getProcessId());
         return 
proxySQLExecutor.getSqlFederationEngine().executeQuery(prepareEngine, callback, 
context);
     }
     
@@ -235,14 +261,12 @@ public final class DatabaseConnector implements 
DatabaseBackendHandler {
         int maxConnectionsSizePerQuery = 
metaData.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
         JDBCBackendStatement statementManager = (JDBCBackendStatement) 
databaseConnectionManager.getConnectionSession().getStatementManager();
         return new DriverExecutionPrepareEngine<>(driverType, 
maxConnectionsSizePerQuery, databaseConnectionManager, statementManager,
-                new StatementOption(isReturnGeneratedKeys), 
metaData.getMetaData().getDatabase(databaseConnectionManager.getConnectionSession().getUsedDatabaseName()).getRuleMetaData().getRules(),
-                
metaData.getMetaData().getDatabase(databaseConnectionManager.getConnectionSession().getUsedDatabaseName()).getResourceMetaData().getStorageUnits());
+                new StatementOption(isReturnGeneratedKeys), 
database.getRuleMetaData().getRules(), 
database.getResourceMetaData().getStorageUnits());
     }
     
-    private ResponseHeader processExecuteFederation(final ResultSet resultSet, 
final MetaDataContexts metaDataContexts) throws SQLException {
+    private ResponseHeader processExecuteFederation(final ResultSet resultSet) 
throws SQLException {
         int columnCount = resultSet.getMetaData().getColumnCount();
         queryHeaders = new ArrayList<>(columnCount);
-        ShardingSphereDatabase database = 
metaDataContexts.getMetaData().getDatabase(databaseConnectionManager.getConnectionSession().getUsedDatabaseName());
         QueryHeaderBuilderEngine queryHeaderBuilderEngine = new 
QueryHeaderBuilderEngine(null == database ? null : database.getProtocolType());
         for (int columnIndex = 1; columnIndex <= columnCount; columnIndex++) {
             queryHeaders.add(queryHeaderBuilderEngine.build(new 
JDBCQueryResultMetaData(resultSet.getMetaData()), database, columnIndex));
@@ -251,36 +275,11 @@ public final class DatabaseConnector implements 
DatabaseBackendHandler {
         return new QueryResponseHeader(queryHeaders);
     }
     
-    private void prepareCursorStatementContext(final CursorAvailable 
statementContext, final ConnectionSession connectionSession) {
-        if (statementContext.getCursorName().isPresent()) {
-            String cursorName = 
statementContext.getCursorName().get().getIdentifier().getValue().toLowerCase();
-            prepareCursorStatementContext(statementContext, connectionSession, 
cursorName);
-        }
-        if (statementContext instanceof CloseStatementContext && 
((CloseStatementContext) statementContext).getSqlStatement().isCloseAll()) {
-            connectionSession.getConnectionContext().clearCursorContext();
-        }
-    }
-    
-    private void prepareCursorStatementContext(final CursorAvailable 
statementContext, final ConnectionSession connectionSession, final String 
cursorName) {
-        if (statementContext instanceof CursorStatementContext) {
-            
connectionSession.getConnectionContext().getCursorContext().getCursorStatementContexts().put(cursorName,
 (CursorStatementContext) statementContext);
-        }
-        if (statementContext instanceof CursorAware) {
-            CursorStatementContext cursorStatementContext = 
connectionSession.getConnectionContext().getCursorContext().getCursorStatementContexts().get(cursorName);
-            Preconditions.checkArgument(null != cursorStatementContext, 
"Cursor %s does not exist.", cursorName);
-            ((CursorAware) 
statementContext).setCursorStatementContext(cursorStatementContext);
-        }
-        if (statementContext instanceof CloseStatementContext) {
-            
connectionSession.getConnectionContext().getCursorContext().removeCursor(cursorName);
-        }
-    }
-    
     private void refreshMetaData(final ExecutionContext executionContext) 
throws SQLException {
         
getMetaDataRefreshEngine().refresh(queryContext.getSqlStatementContext(), 
executionContext.getRouteContext().getRouteUnits());
     }
     
     private MetaDataRefreshEngine getMetaDataRefreshEngine() {
-        ContextManager contextManager = 
ProxyContext.getInstance().getContextManager();
         return new MetaDataRefreshEngine(
                 
contextManager.getPersistServiceFacade().getMetaDataManagerPersistService(), 
queryContext.getUsedDatabase(), 
contextManager.getMetaDataContexts().getMetaData().getProps());
     }
@@ -316,9 +315,8 @@ public final class DatabaseConnector implements 
DatabaseBackendHandler {
     }
     
     private MergedResult mergeQuery(final SQLStatementContext 
sqlStatementContext, final List<QueryResult> queryResults) throws SQLException {
-        ShardingSphereMetaData metaData = 
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData();
-        MergeEngine mergeEngine = new MergeEngine(
-                metaData.getGlobalRuleMetaData(), 
queryContext.getUsedDatabase(), metaData.getProps(), 
databaseConnectionManager.getConnectionSession().getConnectionContext());
+        MergeEngine mergeEngine = new 
MergeEngine(contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData(),
+                queryContext.getUsedDatabase(), 
contextManager.getMetaDataContexts().getMetaData().getProps(), 
databaseConnectionManager.getConnectionSession().getConnectionContext());
         return mergeEngine.merge(queryResults, sqlStatementContext);
     }
     

Reply via email to