This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 5dd0e470915 Add ShardingSphereConnection.getTrafficInstanceId() 
(#31414)
5dd0e470915 is described below

commit 5dd0e470915766a2ec6edd387ca8598620689668
Author: Liang Zhang <[email protected]>
AuthorDate: Mon May 27 17:07:56 2024 +0800

    Add ShardingSphereConnection.getTrafficInstanceId() (#31414)
---
 .../core/connection/ShardingSphereConnection.java  | 29 ++++++++++++++-
 .../statement/ShardingSpherePreparedStatement.java | 42 ++++++----------------
 .../core/statement/ShardingSphereStatement.java    | 38 +++++---------------
 .../traffic/executor/TrafficExecutor.java          | 17 ---------
 4 files changed, 48 insertions(+), 78 deletions(-)

diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
index 084d1ac29ff..c44cc8a2456 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
@@ -18,17 +18,20 @@
 package org.apache.shardingsphere.driver.jdbc.core.connection;
 
 import lombok.Getter;
+import org.apache.shardingsphere.driver.exception.ConnectionClosedException;
 import org.apache.shardingsphere.driver.jdbc.adapter.AbstractConnectionAdapter;
 import 
org.apache.shardingsphere.driver.jdbc.core.datasource.metadata.ShardingSphereDatabaseMetaData;
 import 
org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSpherePreparedStatement;
 import 
org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSphereStatement;
-import org.apache.shardingsphere.driver.exception.ConnectionClosedException;
 import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
 import org.apache.shardingsphere.infra.metadata.user.Grantee;
 import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
+import org.apache.shardingsphere.infra.session.query.QueryContext;
 import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.traffic.engine.TrafficEngine;
+import org.apache.shardingsphere.traffic.rule.TrafficRule;
 import org.apache.shardingsphere.transaction.api.TransactionType;
 import org.apache.shardingsphere.transaction.rule.TransactionRule;
 
@@ -40,6 +43,7 @@ import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
 import java.sql.Savepoint;
 import java.sql.Statement;
+import java.util.Optional;
 
 /**
  * ShardingSphere connection.
@@ -212,6 +216,29 @@ public final class ShardingSphereConnection extends 
AbstractConnectionAdapter {
         }
     }
     
+    /**
+     * Get traffic tnstance ID.
+     *
+     * @param trafficRule traffic rule
+     * @param queryContext query context
+     * @return traffic tnstance ID
+     */
+    public Optional<String> getTrafficInstanceId(final TrafficRule 
trafficRule, final QueryContext queryContext) {
+        if (null == trafficRule || trafficRule.getStrategyRules().isEmpty()) {
+            return Optional.empty();
+        }
+        Optional<String> existedTrafficInstanceId = 
databaseConnectionManager.getConnectionContext().getTrafficInstanceId();
+        if (existedTrafficInstanceId.isPresent()) {
+            return existedTrafficInstanceId;
+        }
+        boolean isHoldTransaction = isHoldTransaction();
+        Optional<String> result = new TrafficEngine(trafficRule, 
contextManager.getComputeNodeInstanceContext()).dispatch(queryContext, 
isHoldTransaction);
+        if (isHoldTransaction && result.isPresent()) {
+            
databaseConnectionManager.getConnectionContext().setTrafficInstanceId(result.get());
+        }
+        return result;
+    }
+    
     @Override
     public void commit() throws SQLException {
         try {
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index b7dc4c334e5..fecffc9e8e1 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -68,7 +68,6 @@ import 
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrep
 import org.apache.shardingsphere.infra.hint.HintManager;
 import org.apache.shardingsphere.infra.hint.HintValueContext;
 import org.apache.shardingsphere.infra.hint.SQLHintUtils;
-import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
 import org.apache.shardingsphere.infra.merge.MergeEngine;
 import org.apache.shardingsphere.infra.merge.result.MergedResult;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
@@ -85,7 +84,6 @@ import org.apache.shardingsphere.parser.rule.SQLParserRule;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
 import 
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
-import org.apache.shardingsphere.traffic.engine.TrafficEngine;
 import 
org.apache.shardingsphere.traffic.exception.EmptyTrafficExecutionUnitException;
 import org.apache.shardingsphere.traffic.rule.TrafficRule;
 import org.apache.shardingsphere.transaction.util.AutoCommitUtils;
@@ -228,10 +226,10 @@ public final class ShardingSpherePreparedStatement 
extends AbstractPreparedState
             QueryContext queryContext = createQueryContext();
             
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
             ShardingSphereDatabase database = 
metaDataContexts.getMetaData().getDatabase(databaseName);
-            String trafficInstanceId = 
getInstanceIdAndSet(queryContext).orElse(null);
-            if (null != trafficInstanceId) {
+            Optional<String> trafficInstanceId = 
connection.getTrafficInstanceId(trafficRule, queryContext);
+            if (trafficInstanceId.isPresent()) {
                 currentResultSet = 
executor.getTrafficExecutor().execute(connection.getProcessId(), databaseName,
-                        trafficInstanceId, queryContext, 
createDriverExecutionPrepareEngine(database), (statement, sql) -> 
((PreparedStatement) statement).executeQuery());
+                        trafficInstanceId.get(), queryContext, 
createDriverExecutionPrepareEngine(database), (statement, sql) -> 
((PreparedStatement) statement).executeQuery());
                 return currentResultSet;
             }
             if (decide(queryContext, database, 
metaDataContexts.getMetaData().getGlobalRuleMetaData())) {
@@ -281,24 +279,6 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
         return context.getInputGroups().stream().flatMap(each -> 
each.getInputs().stream()).findFirst().orElseThrow(EmptyTrafficExecutionUnitException::new);
     }
     
-    private Optional<String> getInstanceIdAndSet(final QueryContext 
queryContext) {
-        Optional<String> result = 
connection.getDatabaseConnectionManager().getConnectionContext().getTrafficInstanceId();
-        if (!result.isPresent()) {
-            result = getInstanceId(queryContext);
-        }
-        if (connection.isHoldTransaction() && result.isPresent()) {
-            
connection.getDatabaseConnectionManager().getConnectionContext().setTrafficInstanceId(result.get());
-        }
-        return result;
-    }
-    
-    private Optional<String> getInstanceId(final QueryContext queryContext) {
-        ComputeNodeInstanceContext computeNodeInstanceContext = 
connection.getContextManager().getComputeNodeInstanceContext();
-        return null != trafficRule && !trafficRule.getStrategyRules().isEmpty()
-                ? new TrafficEngine(trafficRule, 
computeNodeInstanceContext).dispatch(queryContext, 
connection.isHoldTransaction())
-                : Optional.empty();
-    }
-    
     private void resetParameters() throws SQLException {
         parameterSets.clear();
         parameterSets.add(getParameters());
@@ -342,11 +322,11 @@ public final class ShardingSpherePreparedStatement 
extends AbstractPreparedState
             clearPrevious();
             QueryContext queryContext = createQueryContext();
             
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
-            String trafficInstanceId = 
getInstanceIdAndSet(queryContext).orElse(null);
-            if (null != trafficInstanceId) {
+            Optional<String> trafficInstanceId = 
connection.getTrafficInstanceId(trafficRule, queryContext);
+            if (trafficInstanceId.isPresent()) {
                 ShardingSphereDatabase database = 
metaDataContexts.getMetaData().getDatabase(databaseName);
                 return 
executor.getTrafficExecutor().execute(connection.getProcessId(), databaseName,
-                        trafficInstanceId, queryContext, 
createDriverExecutionPrepareEngine(database), (statement, sql) -> 
((PreparedStatement) statement).executeUpdate());
+                        trafficInstanceId.get(), queryContext, 
createDriverExecutionPrepareEngine(database), (statement, sql) -> 
((PreparedStatement) statement).executeUpdate());
             }
             executionContext = createExecutionContext(queryContext);
             if (hasRawExecutionRule()) {
@@ -407,11 +387,11 @@ public final class ShardingSpherePreparedStatement 
extends AbstractPreparedState
             clearPrevious();
             QueryContext queryContext = createQueryContext();
             
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
-            String trafficInstanceId = 
getInstanceIdAndSet(queryContext).orElse(null);
-            if (null != trafficInstanceId) {
+            Optional<String> trafficInstanceId = 
connection.getTrafficInstanceId(trafficRule, queryContext);
+            if (trafficInstanceId.isPresent()) {
                 ShardingSphereDatabase database = 
metaDataContexts.getMetaData().getDatabase(databaseName);
                 boolean result = 
executor.getTrafficExecutor().execute(connection.getProcessId(), databaseName,
-                        trafficInstanceId, queryContext, 
createDriverExecutionPrepareEngine(database), (statement, sql) -> 
((PreparedStatement) statement).execute());
+                        trafficInstanceId.get(), queryContext, 
createDriverExecutionPrepareEngine(database), (statement, sql) -> 
((PreparedStatement) statement).execute());
                 currentResultSet = 
executor.getTrafficExecutor().getResultSet();
                 return result;
             }
@@ -624,8 +604,8 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
     public void addBatch() {
         try {
             QueryContext queryContext = createQueryContext();
-            String trafficInstanceId = 
getInstanceIdAndSet(queryContext).orElse(null);
-            executionContext = null == trafficInstanceId ? 
createExecutionContext(queryContext) : createExecutionContext(queryContext, 
trafficInstanceId);
+            executionContext = connection.getTrafficInstanceId(trafficRule, 
queryContext)
+                    .map(optional -> createExecutionContext(queryContext, 
optional)).orElseGet(() -> createExecutionContext(queryContext));
             
batchPreparedStatementExecutor.addBatchForExecutionUnits(executionContext.getExecutionUnits());
         } finally {
             currentResultSet = null;
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index d2b2e12dfe2..9becb70f884 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -62,7 +62,6 @@ import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.Statemen
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
 import org.apache.shardingsphere.infra.hint.HintValueContext;
 import org.apache.shardingsphere.infra.hint.SQLHintUtils;
-import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
 import org.apache.shardingsphere.infra.merge.MergeEngine;
 import org.apache.shardingsphere.infra.merge.result.MergedResult;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
@@ -76,7 +75,6 @@ import org.apache.shardingsphere.parser.rule.SQLParserRule;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
 import 
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
-import org.apache.shardingsphere.traffic.engine.TrafficEngine;
 import org.apache.shardingsphere.traffic.executor.TrafficExecutorCallback;
 import org.apache.shardingsphere.traffic.rule.TrafficRule;
 import org.apache.shardingsphere.transaction.util.AutoCommitUtils;
@@ -158,10 +156,10 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
             databaseName = 
queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName());
             
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
             ShardingSphereDatabase database = 
metaDataContexts.getMetaData().getDatabase(databaseName);
-            String trafficInstanceId = 
getInstanceIdAndSet(queryContext).orElse(null);
-            if (null != trafficInstanceId) {
+            Optional<String> trafficInstanceId = 
connection.getTrafficInstanceId(trafficRule, queryContext);
+            if (trafficInstanceId.isPresent()) {
                 currentResultSet = executor.getTrafficExecutor().execute(
-                        connection.getProcessId(), databaseName, 
trafficInstanceId, queryContext, createDriverExecutionPrepareEngine(database), 
Statement::executeQuery);
+                        connection.getProcessId(), databaseName, 
trafficInstanceId.get(), queryContext, 
createDriverExecutionPrepareEngine(database), Statement::executeQuery);
                 return currentResultSet;
             }
             if (decide(queryContext, database, 
metaDataContexts.getMetaData().getGlobalRuleMetaData())) {
@@ -192,24 +190,6 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         return 
executor.getSqlFederationEngine().decide(queryContext.getSqlStatementContext(), 
queryContext.getParameters(), database, globalRuleMetaData);
     }
     
-    private Optional<String> getInstanceIdAndSet(final QueryContext 
queryContext) {
-        Optional<String> result = 
connection.getDatabaseConnectionManager().getConnectionContext().getTrafficInstanceId();
-        if (!result.isPresent()) {
-            result = getInstanceId(queryContext);
-        }
-        if (connection.isHoldTransaction() && result.isPresent()) {
-            
connection.getDatabaseConnectionManager().getConnectionContext().setTrafficInstanceId(result.get());
-        }
-        return result;
-    }
-    
-    private Optional<String> getInstanceId(final QueryContext queryContext) {
-        ComputeNodeInstanceContext computeNodeInstanceContext = 
connection.getContextManager().getComputeNodeInstanceContext();
-        return null != trafficRule && !trafficRule.getStrategyRules().isEmpty()
-                ? new TrafficEngine(trafficRule, 
computeNodeInstanceContext).dispatch(queryContext, 
connection.isHoldTransaction())
-                : Optional.empty();
-    }
-    
     private List<QueryResult> executeQuery0(final ExecutionContext 
executionContext) throws SQLException {
         if 
(!metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty())
 {
             return executor.getRawExecutor().execute(
@@ -311,11 +291,11 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
         databaseName = 
queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName());
         
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
-        String trafficInstanceId = 
getInstanceIdAndSet(queryContext).orElse(null);
-        if (null != trafficInstanceId) {
+        Optional<String> trafficInstanceId = 
connection.getTrafficInstanceId(trafficRule, queryContext);
+        if (trafficInstanceId.isPresent()) {
             ShardingSphereDatabase database = 
metaDataContexts.getMetaData().getDatabase(databaseName);
             return executor.getTrafficExecutor().execute(
-                    connection.getProcessId(), databaseName, 
trafficInstanceId, queryContext, createDriverExecutionPrepareEngine(database), 
trafficCallback);
+                    connection.getProcessId(), databaseName, 
trafficInstanceId.get(), queryContext, 
createDriverExecutionPrepareEngine(database), trafficCallback);
         }
         executionContext = createExecutionContext(queryContext);
         if 
(!metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty())
 {
@@ -417,11 +397,11 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
         databaseName = 
queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName());
         
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
-        String trafficInstanceId = 
getInstanceIdAndSet(queryContext).orElse(null);
-        if (null != trafficInstanceId) {
+        Optional<String> trafficInstanceId = 
connection.getTrafficInstanceId(trafficRule, queryContext);
+        if (trafficInstanceId.isPresent()) {
             ShardingSphereDatabase database = 
metaDataContexts.getMetaData().getDatabase(databaseName);
             boolean result = executor.getTrafficExecutor().execute(
-                    connection.getProcessId(), databaseName, 
trafficInstanceId, queryContext, createDriverExecutionPrepareEngine(database), 
trafficCallback);
+                    connection.getProcessId(), databaseName, 
trafficInstanceId.get(), queryContext, 
createDriverExecutionPrepareEngine(database), trafficCallback);
             currentResultSet = executor.getTrafficExecutor().getResultSet();
             return result;
         }
diff --git 
a/kernel/traffic/core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java
 
b/kernel/traffic/core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java
index fddefd4740f..7706ac23650 100644
--- 
a/kernel/traffic/core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java
+++ 
b/kernel/traffic/core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java
@@ -47,23 +47,6 @@ public final class TrafficExecutor implements AutoCloseable {
     @Getter
     private ResultSet resultSet;
     
-    /**
-     * Execute.
-     * 
-     * @param executionUnit execution unit
-     * @param callback traffic executor callback
-     * @param <T> return type
-     * @return execute result
-     * @throws SQLException SQL exception
-     */
-    public <T> T execute(final JDBCExecutionUnit executionUnit, final 
TrafficExecutorCallback<T> callback) throws SQLException {
-        SQLUnit sqlUnit = executionUnit.getExecutionUnit().getSqlUnit();
-        cacheStatement(sqlUnit.getParameters(), 
executionUnit.getStorageResource());
-        T result = callback.execute(statement, sqlUnit.getSql());
-        resultSet = statement.getResultSet();
-        return result;
-    }
-    
     /**
      * Execute.
      *

Reply via email to