This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 5dd0e470915 Add ShardingSphereConnection.getTrafficInstanceId()
(#31414)
5dd0e470915 is described below
commit 5dd0e470915766a2ec6edd387ca8598620689668
Author: Liang Zhang <[email protected]>
AuthorDate: Mon May 27 17:07:56 2024 +0800
Add ShardingSphereConnection.getTrafficInstanceId() (#31414)
---
.../core/connection/ShardingSphereConnection.java | 29 ++++++++++++++-
.../statement/ShardingSpherePreparedStatement.java | 42 ++++++----------------
.../core/statement/ShardingSphereStatement.java | 38 +++++---------------
.../traffic/executor/TrafficExecutor.java | 17 ---------
4 files changed, 48 insertions(+), 78 deletions(-)
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
index 084d1ac29ff..c44cc8a2456 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
@@ -18,17 +18,20 @@
package org.apache.shardingsphere.driver.jdbc.core.connection;
import lombok.Getter;
+import org.apache.shardingsphere.driver.exception.ConnectionClosedException;
import org.apache.shardingsphere.driver.jdbc.adapter.AbstractConnectionAdapter;
import
org.apache.shardingsphere.driver.jdbc.core.datasource.metadata.ShardingSphereDatabaseMetaData;
import
org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSpherePreparedStatement;
import
org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSphereStatement;
-import org.apache.shardingsphere.driver.exception.ConnectionClosedException;
import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
import org.apache.shardingsphere.infra.metadata.user.Grantee;
import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
+import org.apache.shardingsphere.infra.session.query.QueryContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.traffic.engine.TrafficEngine;
+import org.apache.shardingsphere.traffic.rule.TrafficRule;
import org.apache.shardingsphere.transaction.api.TransactionType;
import org.apache.shardingsphere.transaction.rule.TransactionRule;
@@ -40,6 +43,7 @@ import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.Savepoint;
import java.sql.Statement;
+import java.util.Optional;
/**
* ShardingSphere connection.
@@ -212,6 +216,29 @@ public final class ShardingSphereConnection extends
AbstractConnectionAdapter {
}
}
+ /**
+ * Get traffic tnstance ID.
+ *
+ * @param trafficRule traffic rule
+ * @param queryContext query context
+ * @return traffic tnstance ID
+ */
+ public Optional<String> getTrafficInstanceId(final TrafficRule
trafficRule, final QueryContext queryContext) {
+ if (null == trafficRule || trafficRule.getStrategyRules().isEmpty()) {
+ return Optional.empty();
+ }
+ Optional<String> existedTrafficInstanceId =
databaseConnectionManager.getConnectionContext().getTrafficInstanceId();
+ if (existedTrafficInstanceId.isPresent()) {
+ return existedTrafficInstanceId;
+ }
+ boolean isHoldTransaction = isHoldTransaction();
+ Optional<String> result = new TrafficEngine(trafficRule,
contextManager.getComputeNodeInstanceContext()).dispatch(queryContext,
isHoldTransaction);
+ if (isHoldTransaction && result.isPresent()) {
+
databaseConnectionManager.getConnectionContext().setTrafficInstanceId(result.get());
+ }
+ return result;
+ }
+
@Override
public void commit() throws SQLException {
try {
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index b7dc4c334e5..fecffc9e8e1 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -68,7 +68,6 @@ import
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrep
import org.apache.shardingsphere.infra.hint.HintManager;
import org.apache.shardingsphere.infra.hint.HintValueContext;
import org.apache.shardingsphere.infra.hint.SQLHintUtils;
-import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import org.apache.shardingsphere.infra.merge.MergeEngine;
import org.apache.shardingsphere.infra.merge.result.MergedResult;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
@@ -85,7 +84,6 @@ import org.apache.shardingsphere.parser.rule.SQLParserRule;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
import
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
-import org.apache.shardingsphere.traffic.engine.TrafficEngine;
import
org.apache.shardingsphere.traffic.exception.EmptyTrafficExecutionUnitException;
import org.apache.shardingsphere.traffic.rule.TrafficRule;
import org.apache.shardingsphere.transaction.util.AutoCommitUtils;
@@ -228,10 +226,10 @@ public final class ShardingSpherePreparedStatement
extends AbstractPreparedState
QueryContext queryContext = createQueryContext();
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
- String trafficInstanceId =
getInstanceIdAndSet(queryContext).orElse(null);
- if (null != trafficInstanceId) {
+ Optional<String> trafficInstanceId =
connection.getTrafficInstanceId(trafficRule, queryContext);
+ if (trafficInstanceId.isPresent()) {
currentResultSet =
executor.getTrafficExecutor().execute(connection.getProcessId(), databaseName,
- trafficInstanceId, queryContext,
createDriverExecutionPrepareEngine(database), (statement, sql) ->
((PreparedStatement) statement).executeQuery());
+ trafficInstanceId.get(), queryContext,
createDriverExecutionPrepareEngine(database), (statement, sql) ->
((PreparedStatement) statement).executeQuery());
return currentResultSet;
}
if (decide(queryContext, database,
metaDataContexts.getMetaData().getGlobalRuleMetaData())) {
@@ -281,24 +279,6 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
return context.getInputGroups().stream().flatMap(each ->
each.getInputs().stream()).findFirst().orElseThrow(EmptyTrafficExecutionUnitException::new);
}
- private Optional<String> getInstanceIdAndSet(final QueryContext
queryContext) {
- Optional<String> result =
connection.getDatabaseConnectionManager().getConnectionContext().getTrafficInstanceId();
- if (!result.isPresent()) {
- result = getInstanceId(queryContext);
- }
- if (connection.isHoldTransaction() && result.isPresent()) {
-
connection.getDatabaseConnectionManager().getConnectionContext().setTrafficInstanceId(result.get());
- }
- return result;
- }
-
- private Optional<String> getInstanceId(final QueryContext queryContext) {
- ComputeNodeInstanceContext computeNodeInstanceContext =
connection.getContextManager().getComputeNodeInstanceContext();
- return null != trafficRule && !trafficRule.getStrategyRules().isEmpty()
- ? new TrafficEngine(trafficRule,
computeNodeInstanceContext).dispatch(queryContext,
connection.isHoldTransaction())
- : Optional.empty();
- }
-
private void resetParameters() throws SQLException {
parameterSets.clear();
parameterSets.add(getParameters());
@@ -342,11 +322,11 @@ public final class ShardingSpherePreparedStatement
extends AbstractPreparedState
clearPrevious();
QueryContext queryContext = createQueryContext();
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
- String trafficInstanceId =
getInstanceIdAndSet(queryContext).orElse(null);
- if (null != trafficInstanceId) {
+ Optional<String> trafficInstanceId =
connection.getTrafficInstanceId(trafficRule, queryContext);
+ if (trafficInstanceId.isPresent()) {
ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
return
executor.getTrafficExecutor().execute(connection.getProcessId(), databaseName,
- trafficInstanceId, queryContext,
createDriverExecutionPrepareEngine(database), (statement, sql) ->
((PreparedStatement) statement).executeUpdate());
+ trafficInstanceId.get(), queryContext,
createDriverExecutionPrepareEngine(database), (statement, sql) ->
((PreparedStatement) statement).executeUpdate());
}
executionContext = createExecutionContext(queryContext);
if (hasRawExecutionRule()) {
@@ -407,11 +387,11 @@ public final class ShardingSpherePreparedStatement
extends AbstractPreparedState
clearPrevious();
QueryContext queryContext = createQueryContext();
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
- String trafficInstanceId =
getInstanceIdAndSet(queryContext).orElse(null);
- if (null != trafficInstanceId) {
+ Optional<String> trafficInstanceId =
connection.getTrafficInstanceId(trafficRule, queryContext);
+ if (trafficInstanceId.isPresent()) {
ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
boolean result =
executor.getTrafficExecutor().execute(connection.getProcessId(), databaseName,
- trafficInstanceId, queryContext,
createDriverExecutionPrepareEngine(database), (statement, sql) ->
((PreparedStatement) statement).execute());
+ trafficInstanceId.get(), queryContext,
createDriverExecutionPrepareEngine(database), (statement, sql) ->
((PreparedStatement) statement).execute());
currentResultSet =
executor.getTrafficExecutor().getResultSet();
return result;
}
@@ -624,8 +604,8 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
public void addBatch() {
try {
QueryContext queryContext = createQueryContext();
- String trafficInstanceId =
getInstanceIdAndSet(queryContext).orElse(null);
- executionContext = null == trafficInstanceId ?
createExecutionContext(queryContext) : createExecutionContext(queryContext,
trafficInstanceId);
+ executionContext = connection.getTrafficInstanceId(trafficRule,
queryContext)
+ .map(optional -> createExecutionContext(queryContext,
optional)).orElseGet(() -> createExecutionContext(queryContext));
batchPreparedStatementExecutor.addBatchForExecutionUnits(executionContext.getExecutionUnits());
} finally {
currentResultSet = null;
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index d2b2e12dfe2..9becb70f884 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -62,7 +62,6 @@ import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.Statemen
import
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
import org.apache.shardingsphere.infra.hint.HintValueContext;
import org.apache.shardingsphere.infra.hint.SQLHintUtils;
-import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import org.apache.shardingsphere.infra.merge.MergeEngine;
import org.apache.shardingsphere.infra.merge.result.MergedResult;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
@@ -76,7 +75,6 @@ import org.apache.shardingsphere.parser.rule.SQLParserRule;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
import
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
-import org.apache.shardingsphere.traffic.engine.TrafficEngine;
import org.apache.shardingsphere.traffic.executor.TrafficExecutorCallback;
import org.apache.shardingsphere.traffic.rule.TrafficRule;
import org.apache.shardingsphere.transaction.util.AutoCommitUtils;
@@ -158,10 +156,10 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
databaseName =
queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName());
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
- String trafficInstanceId =
getInstanceIdAndSet(queryContext).orElse(null);
- if (null != trafficInstanceId) {
+ Optional<String> trafficInstanceId =
connection.getTrafficInstanceId(trafficRule, queryContext);
+ if (trafficInstanceId.isPresent()) {
currentResultSet = executor.getTrafficExecutor().execute(
- connection.getProcessId(), databaseName,
trafficInstanceId, queryContext, createDriverExecutionPrepareEngine(database),
Statement::executeQuery);
+ connection.getProcessId(), databaseName,
trafficInstanceId.get(), queryContext,
createDriverExecutionPrepareEngine(database), Statement::executeQuery);
return currentResultSet;
}
if (decide(queryContext, database,
metaDataContexts.getMetaData().getGlobalRuleMetaData())) {
@@ -192,24 +190,6 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
return
executor.getSqlFederationEngine().decide(queryContext.getSqlStatementContext(),
queryContext.getParameters(), database, globalRuleMetaData);
}
- private Optional<String> getInstanceIdAndSet(final QueryContext
queryContext) {
- Optional<String> result =
connection.getDatabaseConnectionManager().getConnectionContext().getTrafficInstanceId();
- if (!result.isPresent()) {
- result = getInstanceId(queryContext);
- }
- if (connection.isHoldTransaction() && result.isPresent()) {
-
connection.getDatabaseConnectionManager().getConnectionContext().setTrafficInstanceId(result.get());
- }
- return result;
- }
-
- private Optional<String> getInstanceId(final QueryContext queryContext) {
- ComputeNodeInstanceContext computeNodeInstanceContext =
connection.getContextManager().getComputeNodeInstanceContext();
- return null != trafficRule && !trafficRule.getStrategyRules().isEmpty()
- ? new TrafficEngine(trafficRule,
computeNodeInstanceContext).dispatch(queryContext,
connection.isHoldTransaction())
- : Optional.empty();
- }
-
private List<QueryResult> executeQuery0(final ExecutionContext
executionContext) throws SQLException {
if
(!metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty())
{
return executor.getRawExecutor().execute(
@@ -311,11 +291,11 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
databaseName =
queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName());
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
- String trafficInstanceId =
getInstanceIdAndSet(queryContext).orElse(null);
- if (null != trafficInstanceId) {
+ Optional<String> trafficInstanceId =
connection.getTrafficInstanceId(trafficRule, queryContext);
+ if (trafficInstanceId.isPresent()) {
ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
return executor.getTrafficExecutor().execute(
- connection.getProcessId(), databaseName,
trafficInstanceId, queryContext, createDriverExecutionPrepareEngine(database),
trafficCallback);
+ connection.getProcessId(), databaseName,
trafficInstanceId.get(), queryContext,
createDriverExecutionPrepareEngine(database), trafficCallback);
}
executionContext = createExecutionContext(queryContext);
if
(!metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty())
{
@@ -417,11 +397,11 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
databaseName =
queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName());
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
- String trafficInstanceId =
getInstanceIdAndSet(queryContext).orElse(null);
- if (null != trafficInstanceId) {
+ Optional<String> trafficInstanceId =
connection.getTrafficInstanceId(trafficRule, queryContext);
+ if (trafficInstanceId.isPresent()) {
ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
boolean result = executor.getTrafficExecutor().execute(
- connection.getProcessId(), databaseName,
trafficInstanceId, queryContext, createDriverExecutionPrepareEngine(database),
trafficCallback);
+ connection.getProcessId(), databaseName,
trafficInstanceId.get(), queryContext,
createDriverExecutionPrepareEngine(database), trafficCallback);
currentResultSet = executor.getTrafficExecutor().getResultSet();
return result;
}
diff --git
a/kernel/traffic/core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java
b/kernel/traffic/core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java
index fddefd4740f..7706ac23650 100644
---
a/kernel/traffic/core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java
+++
b/kernel/traffic/core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java
@@ -47,23 +47,6 @@ public final class TrafficExecutor implements AutoCloseable {
@Getter
private ResultSet resultSet;
- /**
- * Execute.
- *
- * @param executionUnit execution unit
- * @param callback traffic executor callback
- * @param <T> return type
- * @return execute result
- * @throws SQLException SQL exception
- */
- public <T> T execute(final JDBCExecutionUnit executionUnit, final
TrafficExecutorCallback<T> callback) throws SQLException {
- SQLUnit sqlUnit = executionUnit.getExecutionUnit().getSqlUnit();
- cacheStatement(sqlUnit.getParameters(),
executionUnit.getStorageResource());
- T result = callback.execute(statement, sqlUnit.getSql());
- resultSet = statement.getResultSet();
- return result;
- }
-
/**
* Execute.
*