This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 61444f8e08d Refactor MetaDataRefreshEngine.refreshFederation() (#34450)
61444f8e08d is described below
commit 61444f8e08d587e2b8435035b054df6414de02cf
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Jan 23 22:03:26 2025 +0800
Refactor MetaDataRefreshEngine.refreshFederation() (#34450)
* Move package of FederationMetaDataRefresher
* Refactor MetaDataRefreshEngine
* Refactor MetaDataRefreshEngine.refreshFederation()
* Refactor MetaDataRefreshEngine.refreshFederation()
---
.../executor/engine/DriverExecuteExecutor.java | 13 ++---
.../jdbc/DriverJDBCPushDownExecuteExecutor.java | 8 +--
.../DriverJDBCPushDownExecuteUpdateExecutor.java | 12 ++--
.../refresher/metadata/MetaDataRefreshEngine.java | 66 +++++++++-------------
.../connector/StandardDatabaseConnector.java | 17 +++---
5 files changed, 47 insertions(+), 69 deletions(-)
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java
index 7d7c20cbd36..4c780e46c27 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java
@@ -101,23 +101,18 @@ public final class DriverExecuteExecutor {
new
ExecuteQueryCallbackFactory(prepareEngine.getType()).newInstance(database,
queryContext), new SQLFederationContext(false, queryContext, metaData,
connection.getProcessId()));
return null != resultSet;
}
- MetaDataRefreshEngine metaDataRefreshEngine =
getMetaDataRefreshEngine(database);
- if (sqlFederationEngine.enabled() &&
metaDataRefreshEngine.isFederation(queryContext.getSqlStatementContext())) {
-
metaDataRefreshEngine.refresh(queryContext.getSqlStatementContext());
+ if (sqlFederationEngine.enabled()) {
+ new
MetaDataRefreshEngine(connection.getContextManager().getPersistServiceFacade().getMetaDataManagerPersistService(),
database, metaData.getProps())
+ .refreshFederation(queryContext.getSqlStatementContext());
return true;
}
if (transactionExecutor.decide(queryContext)) {
return transactionExecutor.execute((TCLStatement)
queryContext.getSqlStatementContext().getSqlStatement());
}
- ExecutionContext executionContext =
- new KernelProcessor().generateExecutionContext(queryContext,
metaData.getGlobalRuleMetaData(), metaData.getProps());
+ ExecutionContext executionContext = new
KernelProcessor().generateExecutionContext(queryContext,
metaData.getGlobalRuleMetaData(), metaData.getProps());
return executePushDown(database, executionContext, prepareEngine,
executeCallback, addCallback, replayCallback);
}
- private MetaDataRefreshEngine getMetaDataRefreshEngine(final
ShardingSphereDatabase database) {
- return new
MetaDataRefreshEngine(connection.getContextManager().getPersistServiceFacade().getMetaDataManagerPersistService(),
database, metaData.getProps());
- }
-
@SuppressWarnings("rawtypes")
private boolean executePushDown(final ShardingSphereDatabase database,
final ExecutionContext executionContext, final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final StatementExecuteCallback
executeCallback, final StatementAddCallback addCallback, final
StatementReplayCallback replayCallback) throws SQLException {
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteExecutor.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteExecutor.java
index 23581b492e2..171ef59380e 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteExecutor.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteExecutor.java
@@ -104,13 +104,11 @@ public final class DriverJDBCPushDownExecuteExecutor {
processEngine.executeSQL(executionGroupContext,
executionContext.getQueryContext());
List<Boolean> results = jdbcExecutor.execute(executionGroupContext,
new
ExecuteCallbackFactory(prepareEngine.getType()).newInstance(database,
executeCallback, executionContext.getSqlStatementContext().getSqlStatement()));
- if
(isNeedImplicitCommit(executionContext.getQueryContext().getSqlStatementContext()))
{
+ if
(isNeedImplicitCommit(executionContext.getSqlStatementContext())) {
connection.commit();
}
- if
(MetaDataRefreshEngine.isRefreshMetaDataRequired(executionContext.getSqlStatementContext()))
{
- new
MetaDataRefreshEngine(connection.getContextManager().getPersistServiceFacade().getMetaDataManagerPersistService(),
database, metaData.getProps())
-
.refresh(executionContext.getQueryContext().getSqlStatementContext(),
executionContext.getRouteContext().getRouteUnits());
- }
+ new
MetaDataRefreshEngine(connection.getContextManager().getPersistServiceFacade().getMetaDataManagerPersistService(),
database, metaData.getProps())
+ .refresh(executionContext.getSqlStatementContext(),
executionContext.getRouteContext().getRouteUnits());
return null != results && !results.isEmpty() && null !=
results.get(0) && results.get(0);
} finally {
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteUpdateExecutor.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteUpdateExecutor.java
index cd87cca94ca..db4d9671ed3 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteUpdateExecutor.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteUpdateExecutor.java
@@ -106,16 +106,16 @@ public final class
DriverJDBCPushDownExecuteUpdateExecutor {
try {
processEngine.executeSQL(executionGroupContext,
executionContext.getQueryContext());
JDBCExecutorCallback<Integer> callback = new
ExecuteUpdateCallbackFactory(prepareEngine.getType())
- .newInstance(database,
executionContext.getQueryContext().getSqlStatementContext().getSqlStatement(),
updateCallback);
+ .newInstance(database,
executionContext.getSqlStatementContext().getSqlStatement(), updateCallback);
List<Integer> updateCounts =
jdbcExecutor.execute(executionGroupContext, callback);
- if
(MetaDataRefreshEngine.isRefreshMetaDataRequired(executionContext.getQueryContext().getSqlStatementContext()))
{
- if
(isNeedImplicitCommit(executionContext.getQueryContext().getSqlStatementContext()))
{
+ MetaDataRefreshEngine metaDataRefreshEngine = new
MetaDataRefreshEngine(connection.getContextManager().getPersistServiceFacade().getMetaDataManagerPersistService(),
database, props);
+ if
(metaDataRefreshEngine.isNeedRefreshMetaData(executionContext.getSqlStatementContext()))
{
+ if
(isNeedImplicitCommit(executionContext.getSqlStatementContext())) {
connection.commit();
}
- new
MetaDataRefreshEngine(connection.getContextManager().getPersistServiceFacade().getMetaDataManagerPersistService(),
database, props)
-
.refresh(executionContext.getQueryContext().getSqlStatementContext(),
executionContext.getRouteContext().getRouteUnits());
+
metaDataRefreshEngine.refresh(executionContext.getSqlStatementContext(),
executionContext.getRouteContext().getRouteUnits());
}
- return isNeedAccumulate(database.getRuleMetaData().getRules(),
executionContext.getQueryContext().getSqlStatementContext()) ?
accumulate(updateCounts) : updateCounts.get(0);
+ return isNeedAccumulate(database.getRuleMetaData().getRules(),
executionContext.getSqlStatementContext()) ? accumulate(updateCounts) :
updateCounts.get(0);
} finally {
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
}
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/metadata/MetaDataRefreshEngine.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/metadata/MetaDataRefreshEngine.java
index 4a75931c60c..30bde2fee73 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/metadata/MetaDataRefreshEngine.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/metadata/MetaDataRefreshEngine.java
@@ -67,6 +67,17 @@ public final class MetaDataRefreshEngine {
private final ConfigurationProperties props;
+ /**
+ * Whether to need refresh meta data.
+ *
+ * @param sqlStatementContext SQL statement context
+ * @return is need refresh meta data or not
+ */
+ public boolean isNeedRefreshMetaData(final SQLStatementContext
sqlStatementContext) {
+ Class<?> sqlStatementClass =
sqlStatementContext.getSqlStatement().getClass().getSuperclass();
+ return DDL_STATEMENT_CLASSES.contains(sqlStatementClass);
+ }
+
/**
* Refresh meta data.
*
@@ -76,29 +87,19 @@ public final class MetaDataRefreshEngine {
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public void refresh(final SQLStatementContext sqlStatementContext, final
Collection<RouteUnit> routeUnits) throws SQLException {
- Class sqlStatementClass =
sqlStatementContext.getSqlStatement().getClass().getSuperclass();
- if (!DDL_STATEMENT_CLASSES.contains(sqlStatementClass)) {
+ if (!isNeedRefreshMetaData(sqlStatementContext)) {
return;
}
- Optional<MetaDataRefresher> schemaRefresher =
TypedSPILoader.findService(MetaDataRefresher.class, sqlStatementClass);
- if (schemaRefresher.isPresent()) {
- Collection<String> logicDataSourceNames =
routeUnits.stream().map(each ->
each.getDataSourceMapper().getLogicName()).collect(Collectors.toList());
- String schemaName = sqlStatementContext instanceof TableAvailable
? getSchemaName(sqlStatementContext) : null;
- DatabaseType databaseType = routeUnits.stream().map(each ->
database.getResourceMetaData().getStorageUnits().get(each.getDataSourceMapper().getActualName()))
-
.filter(Objects::nonNull).findFirst().map(StorageUnit::getStorageType).orElseGet(sqlStatementContext::getDatabaseType);
- schemaRefresher.get().refresh(metaDataManagerPersistService,
database, logicDataSourceNames, schemaName, databaseType,
sqlStatementContext.getSqlStatement(), props);
+ Class<?> sqlStatementClass =
sqlStatementContext.getSqlStatement().getClass().getSuperclass();
+ Optional<MetaDataRefresher> metaDataRefresher =
TypedSPILoader.findService(MetaDataRefresher.class, sqlStatementClass);
+ if (!metaDataRefresher.isPresent()) {
+ return;
}
- }
-
- /**
- * Refresh meta data for federation.
- *
- * @param sqlStatementContext SQL statement context
- */
- @SuppressWarnings("unchecked")
- public void refresh(final SQLStatementContext sqlStatementContext) {
- getFederationMetaDataRefresher(sqlStatementContext).ifPresent(
- optional -> optional.refresh(metaDataManagerPersistService,
database, getSchemaName(sqlStatementContext),
sqlStatementContext.getSqlStatement()));
+ Collection<String> logicDataSourceNames = routeUnits.stream().map(each
-> each.getDataSourceMapper().getLogicName()).collect(Collectors.toList());
+ String schemaName = sqlStatementContext instanceof TableAvailable ?
getSchemaName(sqlStatementContext) : null;
+ DatabaseType databaseType = routeUnits.stream().map(each ->
database.getResourceMetaData().getStorageUnits().get(each.getDataSourceMapper().getActualName()))
+
.filter(Objects::nonNull).findFirst().map(StorageUnit::getStorageType).orElseGet(sqlStatementContext::getDatabaseType);
+ metaDataRefresher.get().refresh(metaDataManagerPersistService,
database, logicDataSourceNames, schemaName, databaseType,
sqlStatementContext.getSqlStatement(), props);
}
private String getSchemaName(final SQLStatementContext
sqlStatementContext) {
@@ -107,27 +108,14 @@ public final class MetaDataRefreshEngine {
}
/**
- * SQL statement is federation or not.
- *
- * @param sqlStatementContext SQL statement context
- * @return is federation or not
- */
- public boolean isFederation(final SQLStatementContext sqlStatementContext)
{
- return getFederationMetaDataRefresher(sqlStatementContext).isPresent();
- }
-
- @SuppressWarnings("rawtypes")
- private Optional<FederationMetaDataRefresher>
getFederationMetaDataRefresher(final SQLStatementContext sqlStatementContext) {
- return TypedSPILoader.findService(FederationMetaDataRefresher.class,
sqlStatementContext.getSqlStatement().getClass().getSuperclass());
- }
-
- /**
- * Is refresh meta data required.
+ * Refresh meta data for federation.
*
* @param sqlStatementContext SQL statement context
- * @return is refresh meta data required or not
*/
- public static boolean isRefreshMetaDataRequired(final SQLStatementContext
sqlStatementContext) {
- return
DDL_STATEMENT_CLASSES.contains(sqlStatementContext.getSqlStatement().getClass().getSuperclass());
+ @SuppressWarnings("unchecked")
+ public void refreshFederation(final SQLStatementContext
sqlStatementContext) {
+ Class<?> sqlStatementClass =
sqlStatementContext.getSqlStatement().getClass().getSuperclass();
+ TypedSPILoader.findService(FederationMetaDataRefresher.class,
sqlStatementClass).ifPresent(
+ optional -> optional.refresh(metaDataManagerPersistService,
database, getSchemaName(sqlStatementContext),
sqlStatementContext.getSqlStatement()));
}
}
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/StandardDatabaseConnector.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/StandardDatabaseConnector.java
index 1fa6068d1c5..08fa0840757 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/StandardDatabaseConnector.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/StandardDatabaseConnector.java
@@ -110,6 +110,8 @@ public final class StandardDatabaseConnector implements
DatabaseConnector {
private final ProxySQLExecutor proxySQLExecutor;
+ private final MetaDataRefreshEngine metaDataRefreshEngine;
+
private final Collection<Statement> cachedStatements =
Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Collection<ResultSet> cachedResultSets =
Collections.newSetFromMap(new ConcurrentHashMap<>());
@@ -131,6 +133,8 @@ public final class StandardDatabaseConnector implements
DatabaseConnector {
prepareCursorStatementContext((CursorAvailable)
sqlStatementContext);
}
proxySQLExecutor = new ProxySQLExecutor(driverType,
databaseConnectionManager, this, queryContext);
+ metaDataRefreshEngine = new MetaDataRefreshEngine(
+
contextManager.getPersistServiceFacade().getMetaDataManagerPersistService(),
database, contextManager.getMetaDataContexts().getMetaData().getProps());
}
private void checkBackendReady(final SQLStatementContext
sqlStatementContext) {
@@ -179,9 +183,8 @@ public final class StandardDatabaseConnector implements
DatabaseConnector {
if (proxySQLExecutor.getSqlFederationEngine().decide(queryContext,
contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData())) {
return processExecuteFederation(doExecuteFederation());
}
- MetaDataRefreshEngine metaDataRefreshEngine =
getMetaDataRefreshEngine();
- if (proxySQLExecutor.getSqlFederationEngine().enabled() &&
metaDataRefreshEngine.isFederation(queryContext.getSqlStatementContext())) {
-
metaDataRefreshEngine.refresh(queryContext.getSqlStatementContext());
+ if (proxySQLExecutor.getSqlFederationEngine().enabled()) {
+
metaDataRefreshEngine.refreshFederation(queryContext.getSqlStatementContext());
return new
UpdateResponseHeader(queryContext.getSqlStatementContext().getSqlStatement());
}
ExecutionContext executionContext = generateExecutionContext();
@@ -233,9 +236,7 @@ public final class StandardDatabaseConnector implements
DatabaseConnector {
List<ExecuteResult> executeResults = advancedExecutors.isEmpty()
? proxySQLExecutor.execute(executionContext)
:
advancedExecutors.iterator().next().execute(executionContext, contextManager,
database, this);
- if
(MetaDataRefreshEngine.isRefreshMetaDataRequired(queryContext.getSqlStatementContext()))
{
-
getMetaDataRefreshEngine().refresh(queryContext.getSqlStatementContext(),
executionContext.getRouteContext().getRouteUnits());
- }
+ metaDataRefreshEngine.refresh(queryContext.getSqlStatementContext(),
executionContext.getRouteContext().getRouteUnits());
Object executeResultSample = executeResults.iterator().next();
return executeResultSample instanceof QueryResult
? processExecuteQuery(queryContext.getSqlStatementContext(),
executeResults.stream().map(QueryResult.class::cast).collect(Collectors.toList()),
(QueryResult) executeResultSample)
@@ -271,10 +272,6 @@ public final class StandardDatabaseConnector implements
DatabaseConnector {
return new QueryResponseHeader(queryHeaders);
}
- private MetaDataRefreshEngine getMetaDataRefreshEngine() {
- return new
MetaDataRefreshEngine(contextManager.getPersistServiceFacade().getMetaDataManagerPersistService(),
database, contextManager.getMetaDataContexts().getMetaData().getProps());
- }
-
private QueryResponseHeader processExecuteQuery(final SQLStatementContext
sqlStatementContext, final List<QueryResult> queryResults, final QueryResult
queryResultSample) throws SQLException {
queryHeaders = createQueryHeaders(sqlStatementContext,
queryResultSample);
mergedResult = mergeQuery(sqlStatementContext, queryResults);