This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 0b458b8fc29 Refactor MergeEngine (#17964)
0b458b8fc29 is described below
commit 0b458b8fc2938c9bf648e4ad95861b6ac97dd888
Author: Liang Zhang <[email protected]>
AuthorDate: Thu May 26 14:56:46 2022 +0800
Refactor MergeEngine (#17964)
* Refactor MergeEngine
* Refactor MergeEngine
---
.../table/FilterableTableScanExecutor.java | 7 ++---
.../shardingsphere/infra/merge/MergeEngine.java | 22 +++++---------
.../infra/merge/MergeEngineTest.java | 34 ++++++++++++----------
.../statement/ShardingSpherePreparedStatement.java | 5 +---
.../core/statement/ShardingSphereStatement.java | 6 +---
.../mode/manager/ContextManager.java | 2 +-
.../communication/DatabaseCommunicationEngine.java | 6 +---
7 files changed, 33 insertions(+), 49 deletions(-)
diff --git
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/table/FilterableTableScanExecutor.java
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/table/FilterableTableScanExecutor.java
index e0a749131d8..f1e7232a570 100644
---
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/table/FilterableTableScanExecutor.java
+++
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/table/FilterableTableScanExecutor.java
@@ -144,18 +144,17 @@ public final class FilterableTableScanExecutor {
federationContext.getExecutionUnits().addAll(context.getExecutionUnits());
return createEmptyEnumerable();
}
- return execute(schemaName, databaseType, logicSQL, database, context);
+ return execute(databaseType, logicSQL, database, context);
}
- private AbstractEnumerable<Object[]> execute(final String schemaName,
final DatabaseType databaseType, final LogicSQL logicSQL,
- final ShardingSphereDatabase
database, final ExecutionContext context) {
+ private AbstractEnumerable<Object[]> execute(final DatabaseType
databaseType, final LogicSQL logicSQL, final ShardingSphereDatabase database,
final ExecutionContext context) {
try {
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
prepareEngine.prepare(context.getRouteContext(), context.getExecutionUnits());
setParameters(executionGroupContext.getInputGroups());
ExecuteProcessEngine.initialize(context.getLogicSQL(),
executionGroupContext, executorContext.getProps());
List<QueryResult> queryResults = execute(executionGroupContext,
databaseType);
ExecuteProcessEngine.finish(executionGroupContext.getExecutionID());
- MergeEngine mergeEngine = new MergeEngine(schemaName,
databaseType, database, executorContext.getProps(),
database.getRuleMetaData().getRules());
+ MergeEngine mergeEngine = new MergeEngine(database,
executorContext.getProps());
MergedResult mergedResult = mergeEngine.merge(queryResults,
logicSQL.getSqlStatementContext());
Collection<Statement> statements =
getStatements(executionGroupContext.getInputGroups());
return createEnumerable(mergedResult,
queryResults.get(0).getMetaData(), statements);
diff --git
a/shardingsphere-infra/shardingsphere-infra-merge/src/main/java/org/apache/shardingsphere/infra/merge/MergeEngine.java
b/shardingsphere-infra/shardingsphere-infra-merge/src/main/java/org/apache/shardingsphere/infra/merge/MergeEngine.java
index d753cbc2fa2..900fbad925b 100644
---
a/shardingsphere-infra/shardingsphere-infra-merge/src/main/java/org/apache/shardingsphere/infra/merge/MergeEngine.java
+++
b/shardingsphere-infra/shardingsphere-infra-merge/src/main/java/org/apache/shardingsphere/infra/merge/MergeEngine.java
@@ -19,7 +19,6 @@ package org.apache.shardingsphere.infra.merge;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import org.apache.shardingsphere.infra.merge.engine.ResultProcessEngine;
import org.apache.shardingsphere.infra.merge.engine.ResultProcessEngineFactory;
@@ -33,7 +32,6 @@ import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import java.sql.SQLException;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -44,10 +42,6 @@ import java.util.Optional;
*/
public final class MergeEngine {
- private final String databaseName;
-
- private final DatabaseType databaseType;
-
private final ShardingSphereDatabase database;
private final ConfigurationProperties props;
@@ -55,13 +49,10 @@ public final class MergeEngine {
@SuppressWarnings("rawtypes")
private final Map<ShardingSphereRule, ResultProcessEngine> engines;
- public MergeEngine(final String databaseName, final DatabaseType
databaseType, final ShardingSphereDatabase database,
- final ConfigurationProperties props, final
Collection<ShardingSphereRule> rules) {
- this.databaseName = databaseName;
- this.databaseType = databaseType;
+ public MergeEngine(final ShardingSphereDatabase database, final
ConfigurationProperties props) {
this.database = database;
this.props = props;
- engines = ResultProcessEngineFactory.getInstances(rules);
+ engines =
ResultProcessEngineFactory.getInstances(database.getRuleMetaData().getRules());
}
/**
@@ -82,7 +73,8 @@ public final class MergeEngine {
private Optional<MergedResult> executeMerge(final List<QueryResult>
queryResults, final SQLStatementContext<?> sqlStatementContext) throws
SQLException {
for (Entry<ShardingSphereRule, ResultProcessEngine> entry :
engines.entrySet()) {
if (entry.getValue() instanceof ResultMergerEngine) {
- ResultMerger resultMerger = ((ResultMergerEngine)
entry.getValue()).newInstance(databaseName, databaseType, entry.getKey(),
props, sqlStatementContext);
+ ResultMerger resultMerger = ((ResultMergerEngine)
entry.getValue()).newInstance(
+ database.getName(),
database.getResource().getDatabaseType(), entry.getKey(), props,
sqlStatementContext);
return Optional.of(resultMerger.merge(queryResults,
sqlStatementContext, database));
}
}
@@ -94,7 +86,8 @@ public final class MergeEngine {
MergedResult result = null;
for (Entry<ShardingSphereRule, ResultProcessEngine> entry :
engines.entrySet()) {
if (entry.getValue() instanceof ResultDecoratorEngine) {
- ResultDecorator resultDecorator = ((ResultDecoratorEngine)
entry.getValue()).newInstance(databaseType, databaseName, database,
entry.getKey(), props, sqlStatementContext);
+ ResultDecorator resultDecorator = ((ResultDecoratorEngine)
entry.getValue()).newInstance(
+ database.getResource().getDatabaseType(),
database.getName(), database, entry.getKey(), props, sqlStatementContext);
result = null == result ?
resultDecorator.decorate(mergedResult, sqlStatementContext, entry.getKey()) :
resultDecorator.decorate(result, sqlStatementContext, entry.getKey());
}
}
@@ -106,7 +99,8 @@ public final class MergeEngine {
MergedResult result = null;
for (Entry<ShardingSphereRule, ResultProcessEngine> entry :
engines.entrySet()) {
if (entry.getValue() instanceof ResultDecoratorEngine) {
- ResultDecorator resultDecorator = ((ResultDecoratorEngine)
entry.getValue()).newInstance(databaseType, databaseName, database,
entry.getKey(), props, sqlStatementContext);
+ ResultDecorator resultDecorator = ((ResultDecoratorEngine)
entry.getValue()).newInstance(
+ database.getResource().getDatabaseType(),
database.getName(), database, entry.getKey(), props, sqlStatementContext);
result = null == result ?
resultDecorator.decorate(queryResult, sqlStatementContext, entry.getKey()) :
resultDecorator.decorate(result, sqlStatementContext, entry.getKey());
}
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-merge/src/test/java/org/apache/shardingsphere/infra/merge/MergeEngineTest.java
b/shardingsphere-infra/shardingsphere-infra-merge/src/test/java/org/apache/shardingsphere/infra/merge/MergeEngineTest.java
index 97823f80ff4..6c1a69778ee 100644
---
a/shardingsphere-infra/shardingsphere-infra-merge/src/test/java/org/apache/shardingsphere/infra/merge/MergeEngineTest.java
+++
b/shardingsphere-infra/shardingsphere-infra-merge/src/test/java/org/apache/shardingsphere/infra/merge/MergeEngineTest.java
@@ -27,8 +27,10 @@ import
org.apache.shardingsphere.infra.merge.fixture.rule.IndependentRuleFixture
import org.apache.shardingsphere.infra.merge.fixture.rule.MergerRuleFixture;
import org.apache.shardingsphere.infra.merge.result.MergedResult;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@@ -39,15 +41,13 @@ import java.util.Properties;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public final class MergeEngineTest {
- @Mock
- private DatabaseType databaseType;
-
- @Mock
+ @Mock(answer = Answers.RETURNS_DEEP_STUBS)
private ShardingSphereDatabase database;
@Mock
@@ -56,36 +56,38 @@ public final class MergeEngineTest {
@Mock
private SQLStatementContext<?> sqlStatementContext;
+ @Before
+ public void setUp() {
+ when(database.getName()).thenReturn(DefaultDatabase.LOGIC_NAME);
+
when(database.getResource().getDatabaseType()).thenReturn(mock(DatabaseType.class));
+ }
+
@Test
public void assertMergeWithIndependentRule() throws SQLException {
+
when(database.getRuleMetaData().getRules()).thenReturn(Collections.singleton(new
IndependentRuleFixture()));
when(queryResult.getValue(1, String.class)).thenReturn("test");
- MergeEngine mergeEngine = new MergeEngine(
- DefaultDatabase.LOGIC_NAME, databaseType, database, new
ConfigurationProperties(new Properties()), Collections.singletonList(new
IndependentRuleFixture()));
- MergedResult actual =
mergeEngine.merge(Collections.singletonList(queryResult), sqlStatementContext);
+ MergedResult actual = new MergeEngine(database, new
ConfigurationProperties(new
Properties())).merge(Collections.singletonList(queryResult),
sqlStatementContext);
assertThat(actual.getValue(1, String.class), is("test"));
}
@Test
public void assertMergeWithMergerRuleOnly() throws SQLException {
- MergeEngine mergeEngine = new MergeEngine(
- DefaultDatabase.LOGIC_NAME, databaseType, database, new
ConfigurationProperties(new Properties()), Collections.singletonList(new
MergerRuleFixture()));
- MergedResult actual =
mergeEngine.merge(Collections.singletonList(queryResult), sqlStatementContext);
+
when(database.getRuleMetaData().getRules()).thenReturn(Collections.singleton(new
MergerRuleFixture()));
+ MergedResult actual = new MergeEngine(database, new
ConfigurationProperties(new
Properties())).merge(Collections.singletonList(queryResult),
sqlStatementContext);
assertThat(actual.getValue(1, String.class), is("merged_value"));
}
@Test
public void assertMergeWithDecoratorRuleOnly() throws SQLException {
- MergeEngine mergeEngine = new MergeEngine(
- DefaultDatabase.LOGIC_NAME, databaseType, database, new
ConfigurationProperties(new Properties()), Collections.singletonList(new
DecoratorRuleFixture()));
- MergedResult actual =
mergeEngine.merge(Collections.singletonList(queryResult), sqlStatementContext);
+
when(database.getRuleMetaData().getRules()).thenReturn(Collections.singleton(new
DecoratorRuleFixture()));
+ MergedResult actual = new MergeEngine(database, new
ConfigurationProperties(new
Properties())).merge(Collections.singletonList(queryResult),
sqlStatementContext);
assertThat(actual.getValue(1, String.class), is("decorated_value"));
}
@Test
public void assertMergeWithMergerRuleAndDecoratorRuleTogether() throws
SQLException {
- MergeEngine mergeEngine = new MergeEngine(
- DefaultDatabase.LOGIC_NAME, databaseType, database, new
ConfigurationProperties(new Properties()), Arrays.asList(new
MergerRuleFixture(), new DecoratorRuleFixture()));
- MergedResult actual =
mergeEngine.merge(Collections.singletonList(queryResult), sqlStatementContext);
+
when(database.getRuleMetaData().getRules()).thenReturn(Arrays.asList(new
MergerRuleFixture(), new DecoratorRuleFixture()));
+ MergedResult actual = new MergeEngine(database, new
ConfigurationProperties(new
Properties())).merge(Collections.singletonList(queryResult),
sqlStatementContext);
assertThat(actual.getValue(1, String.class),
is("decorated_merged_value"));
}
}
diff --git
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index 8f02146d6fe..faeab00fe8a 100644
---
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -69,7 +69,6 @@ import org.apache.shardingsphere.infra.hint.HintManager;
import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.merge.MergeEngine;
import org.apache.shardingsphere.infra.merge.result.MergedResult;
-import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import
org.apache.shardingsphere.infra.rule.identifier.type.DataNodeContainedRule;
@@ -482,9 +481,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
}
private MergedResult mergeQuery(final List<QueryResult> queryResults)
throws SQLException {
- ShardingSphereDatabase database =
metaDataContexts.getDatabase(connection.getDatabaseName());
- MergeEngine mergeEngine = new MergeEngine(connection.getDatabaseName(),
- database.getResource().getDatabaseType(), database,
metaDataContexts.getMetaData().getProps(),
database.getRuleMetaData().getRules());
+ MergeEngine mergeEngine = new
MergeEngine(metaDataContexts.getDatabase(connection.getDatabaseName()),
metaDataContexts.getMetaData().getProps());
return mergeEngine.merge(queryResults,
executionContext.getSqlStatementContext());
}
diff --git
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index 15f42235a1e..81792bf3cdd 100644
---
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -38,7 +38,6 @@ import
org.apache.shardingsphere.infra.binder.statement.dml.InsertStatementConte
import
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.context.kernel.KernelProcessor;
-import org.apache.shardingsphere.infra.database.DefaultDatabase;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
import org.apache.shardingsphere.infra.exception.ShardingSphereException;
import org.apache.shardingsphere.infra.executor.check.SQLCheckEngine;
@@ -65,7 +64,6 @@ import
org.apache.shardingsphere.infra.federation.executor.FederationContext;
import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.merge.MergeEngine;
import org.apache.shardingsphere.infra.merge.result.MergedResult;
-import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
import org.apache.shardingsphere.infra.route.context.RouteUnit;
import
org.apache.shardingsphere.infra.rule.identifier.type.DataNodeContainedRule;
@@ -547,9 +545,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
}
private MergedResult mergeQuery(final List<QueryResult> queryResults)
throws SQLException {
- ShardingSphereDatabase database =
metaDataContexts.getDatabase(connection.getDatabaseName());
- MergeEngine mergeEngine = new MergeEngine(DefaultDatabase.LOGIC_NAME,
database.getResource().getDatabaseType(), database,
- metaDataContexts.getMetaData().getProps(),
database.getRuleMetaData().getRules());
+ MergeEngine mergeEngine = new
MergeEngine(metaDataContexts.getDatabase(connection.getDatabaseName()),
metaDataContexts.getMetaData().getProps());
return mergeEngine.merge(queryResults,
executionContext.getSqlStatementContext());
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
index 87ee239423f..75fa2f3fcbf 100644
---
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
+++
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
@@ -469,7 +469,7 @@ public final class ContextManager implements AutoCloseable {
refreshRules(databaseName, database);
DatabaseType databaseType =
DatabaseTypeEngine.getDatabaseType(dataSourceMap.values());
Map<String, ShardingSphereSchema> result = new ConcurrentHashMap<>();
- result.putAll(GenericSchemaBuilder.build(databaseName,
database.getProtocolType(), databaseType,
+ result.putAll(GenericSchemaBuilder.build(databaseName,
database.getProtocolType(), databaseType,
dataSourceMap, database.getRuleMetaData().getRules(),
metaDataContexts.getMetaData().getProps()));
result.putAll(SystemSchemaBuilder.build(databaseName,
database.getProtocolType()));
return result;
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
index 7016e413da9..4722fd6f5d0 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
@@ -27,8 +27,6 @@ import
org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
import org.apache.shardingsphere.infra.context.kernel.KernelProcessor;
import org.apache.shardingsphere.infra.context.refresher.MetaDataRefreshEngine;
-import org.apache.shardingsphere.infra.database.DefaultDatabase;
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
@@ -156,9 +154,7 @@ public abstract class DatabaseCommunicationEngine<T> {
}
protected MergedResult mergeQuery(final SQLStatementContext<?>
sqlStatementContext, final List<QueryResult> queryResults) throws SQLException {
- DatabaseType databaseType =
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getDatabase(database.getName()).getResource().getDatabaseType();
- MergeEngine mergeEngine = new MergeEngine(DefaultDatabase.LOGIC_NAME,
- databaseType, database,
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getProps(),
database.getRuleMetaData().getRules());
+ MergeEngine mergeEngine = new MergeEngine(database,
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getProps());
return mergeEngine.merge(queryResults, sqlStatementContext);
}