This is an automated email from the ASF dual-hosted git repository.
wuweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 0fbe3dc Add DriverExecutor (#12681)
0fbe3dc is described below
commit 0fbe3dcfb3c963968d7c3f122dcfea2dcedbfb4d
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Sep 24 15:41:38 2021 +0800
Add DriverExecutor (#12681)
* Refactor AbstractStatementAdapter
* Add DriverExecutor
* Refactor ProxySQLExecutor
---
.../sql/federate/execute/FederationExecutor.java | 8 +-
.../driver/executor/DriverExecutor.java | 59 +++++++++
.../jdbc/adapter/AbstractStatementAdapter.java | 140 ++++++++++-----------
.../statement/ShardingSpherePreparedStatement.java | 37 +++---
.../core/statement/ShardingSphereStatement.java | 45 +++----
.../statement/CircuitBreakerPreparedStatement.java | 6 +-
.../backend/communication/ProxySQLExecutor.java | 2 +-
.../jdbc/connection/BackendConnection.java | 6 +-
.../frontend/command/CommandExecutorTask.java | 2 +-
.../netty/FrontendChannelInboundHandler.java | 2 +-
.../frontend/command/CommandExecutorTaskTest.java | 8 +-
11 files changed, 169 insertions(+), 146 deletions(-)
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederationExecutor.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederationExecutor.java
index 053efd5..6511af8 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederationExecutor.java
+++
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederationExecutor.java
@@ -32,7 +32,7 @@ import java.util.List;
/**
* Federation executor.
*/
-public interface FederationExecutor {
+public interface FederationExecutor extends AutoCloseable {
/**
* Execute query.
@@ -54,10 +54,6 @@ public interface FederationExecutor {
*/
ResultSet getResultSet() throws SQLException;
- /**
- * Close.
- *
- * @throws SQLException SQL exception
- */
+ @Override
void close() throws SQLException;
}
diff --git
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
new file mode 100644
index 0000000..67e88a9
--- /dev/null
+++
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.driver.executor;
+
+import lombok.Getter;
+import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
+import
org.apache.shardingsphere.infra.executor.sql.federate.execute.FederationExecutor;
+import
org.apache.shardingsphere.infra.executor.sql.federate.execute.FederationExecutorFactory;
+import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
+
+import java.sql.SQLException;
+
+/**
+ * Driver executor.
+ */
+@Getter
+public final class DriverExecutor implements AutoCloseable {
+
+ private final DriverJDBCExecutor regularExecutor;
+
+ private final RawExecutor rawExecutor;
+
+ private final FederationExecutor federationExecutor;
+
+ public DriverExecutor(final ShardingSphereConnection connection) {
+ MetaDataContexts metaDataContexts =
connection.getContextManager().getMetaDataContexts();
+ JDBCExecutor jdbcExecutor = new
JDBCExecutor(metaDataContexts.getExecutorEngine(),
connection.isHoldTransaction());
+ regularExecutor = new DriverJDBCExecutor(connection.getSchemaName(),
metaDataContexts, jdbcExecutor);
+ rawExecutor = new RawExecutor(metaDataContexts.getExecutorEngine(),
connection.isHoldTransaction(), metaDataContexts.getProps());
+ federationExecutor =
FederationExecutorFactory.newInstance(connection.getSchemaName(),
metaDataContexts.getOptimizerContext(), metaDataContexts.getProps(),
jdbcExecutor);
+ }
+
+ /**
+ * Close.
+ *
+ * @throws SQLException SQL exception
+ */
+ @Override
+ public void close() throws SQLException {
+ federationExecutor.close();
+ }
+}
diff --git
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
index b50c171..14a0090 100644
---
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
+++
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
@@ -17,10 +17,11 @@
package org.apache.shardingsphere.driver.jdbc.adapter;
+import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.driver.executor.DriverExecutor;
import
org.apache.shardingsphere.driver.jdbc.adapter.executor.ForceExecuteTemplate;
import
org.apache.shardingsphere.driver.jdbc.unsupported.AbstractUnsupportedOperationStatement;
-import
org.apache.shardingsphere.infra.executor.sql.federate.execute.FederationExecutor;
import java.sql.SQLException;
import java.sql.SQLWarning;
@@ -35,82 +36,86 @@ public abstract class AbstractStatementAdapter extends
AbstractUnsupportedOperat
private final Class<? extends Statement> targetClass;
- private boolean closed;
+ private final ForceExecuteTemplate<Statement> forceExecuteTemplate = new
ForceExecuteTemplate<>();
+ @Getter
private boolean poolable;
+ @Getter
private int fetchSize;
+ @Getter
private int fetchDirection;
- private final ForceExecuteTemplate<Statement> forceExecuteTemplate = new
ForceExecuteTemplate<>();
+ @Getter
+ private boolean closed;
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "rawtypes"})
@Override
- public final void close() throws SQLException {
- closed = true;
- try {
- forceExecuteTemplate.execute((Collection) getRoutedStatements(),
Statement::close);
- getFederationExecutor().close();
- } finally {
- getRoutedStatements().clear();
- }
+ public final void setPoolable(final boolean poolable) throws SQLException {
+ this.poolable = poolable;
+ recordMethodInvocation(targetClass, "setPoolable", new Class[]
{boolean.class}, new Object[] {poolable});
+ forceExecuteTemplate.execute((Collection) getRoutedStatements(),
statement -> statement.setPoolable(poolable));
}
+ @SuppressWarnings({"unchecked", "rawtypes"})
@Override
- public final boolean isClosed() {
- return closed;
+ public final void setFetchSize(final int rows) throws SQLException {
+ fetchSize = rows;
+ recordMethodInvocation(targetClass, "setFetchSize", new Class[]
{int.class}, new Object[] {rows});
+ forceExecuteTemplate.execute((Collection) getRoutedStatements(),
statement -> statement.setFetchSize(rows));
}
+ @SuppressWarnings({"unchecked", "rawtypes"})
@Override
- public final boolean isPoolable() {
- return poolable;
+ public final void setFetchDirection(final int direction) throws
SQLException {
+ fetchDirection = direction;
+ recordMethodInvocation(targetClass, "setFetchDirection", new Class[]
{int.class}, new Object[] {direction});
+ forceExecuteTemplate.execute((Collection) getRoutedStatements(),
statement -> statement.setFetchDirection(direction));
}
- @SuppressWarnings("unchecked")
@Override
- public final void setPoolable(final boolean poolable) throws SQLException {
- this.poolable = poolable;
- recordMethodInvocation(targetClass, "setPoolable", new Class[]
{boolean.class}, new Object[] {poolable});
- forceExecuteTemplate.execute((Collection) getRoutedStatements(),
statement -> statement.setPoolable(poolable));
+ public final int getMaxFieldSize() throws SQLException {
+ return getRoutedStatements().isEmpty() ? 0 :
getRoutedStatements().iterator().next().getMaxFieldSize();
}
+ @SuppressWarnings({"unchecked", "rawtypes"})
@Override
- public final int getFetchSize() {
- return fetchSize;
+ public final void setMaxFieldSize(final int max) throws SQLException {
+ recordMethodInvocation(targetClass, "setMaxFieldSize", new Class[]
{int.class}, new Object[] {max});
+ forceExecuteTemplate.execute((Collection) getRoutedStatements(),
statement -> statement.setMaxFieldSize(max));
}
- @SuppressWarnings("unchecked")
+ // TODO Confirm MaxRows for multiple databases is need special handle. eg:
10 statements maybe MaxRows / 10
@Override
- public final void setFetchSize(final int rows) throws SQLException {
- fetchSize = rows;
- recordMethodInvocation(targetClass, "setFetchSize", new Class[]
{int.class}, new Object[] {rows});
- forceExecuteTemplate.execute((Collection) getRoutedStatements(),
statement -> statement.setFetchSize(rows));
+ public final int getMaxRows() throws SQLException {
+ return getRoutedStatements().isEmpty() ? -1 :
getRoutedStatements().iterator().next().getMaxRows();
}
+ @SuppressWarnings({"unchecked", "rawtypes"})
@Override
- public int getFetchDirection() {
- return fetchDirection;
+ public final void setMaxRows(final int max) throws SQLException {
+ recordMethodInvocation(targetClass, "setMaxRows", new Class[]
{int.class}, new Object[] {max});
+ forceExecuteTemplate.execute((Collection) getRoutedStatements(),
statement -> statement.setMaxRows(max));
}
@Override
- public void setFetchDirection(final int direction) throws SQLException {
- fetchDirection = direction;
- recordMethodInvocation(targetClass, "setFetchDirection", new Class[]
{int.class}, new Object[] {direction});
- forceExecuteTemplate.execute((Collection) getRoutedStatements(),
statement -> statement.setFetchDirection(direction));
+ public final int getQueryTimeout() throws SQLException {
+ return getRoutedStatements().isEmpty() ? 0 :
getRoutedStatements().iterator().next().getQueryTimeout();
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "rawtypes"})
@Override
- public final void setEscapeProcessing(final boolean enable) throws
SQLException {
- recordMethodInvocation(targetClass, "setEscapeProcessing", new Class[]
{boolean.class}, new Object[] {enable});
- forceExecuteTemplate.execute((Collection) getRoutedStatements(),
statement -> statement.setEscapeProcessing(enable));
+ public final void setQueryTimeout(final int seconds) throws SQLException {
+ recordMethodInvocation(targetClass, "setQueryTimeout", new Class[]
{int.class}, new Object[] {seconds});
+ forceExecuteTemplate.execute((Collection) getRoutedStatements(),
statement -> statement.setQueryTimeout(seconds));
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "rawtypes"})
@Override
- public final void cancel() throws SQLException {
- forceExecuteTemplate.execute((Collection) getRoutedStatements(),
Statement::cancel);
+ public final void setEscapeProcessing(final boolean enable) throws
SQLException {
+ recordMethodInvocation(targetClass, "setEscapeProcessing", new Class[]
{boolean.class}, new Object[] {enable});
+ forceExecuteTemplate.execute((Collection) getRoutedStatements(),
statement -> statement.setEscapeProcessing(enable));
}
@Override
@@ -142,15 +147,6 @@ public abstract class AbstractStatementAdapter extends
AbstractUnsupportedOperat
}
@Override
- public final SQLWarning getWarnings() {
- return null;
- }
-
- @Override
- public final void clearWarnings() {
- }
-
- @Override
public final boolean getMoreResults() throws SQLException {
boolean result = false;
for (Statement each : getRoutedStatements()) {
@@ -165,45 +161,37 @@ public abstract class AbstractStatementAdapter extends
AbstractUnsupportedOperat
}
@Override
- public final int getMaxFieldSize() throws SQLException {
- return getRoutedStatements().isEmpty() ? 0 :
getRoutedStatements().iterator().next().getMaxFieldSize();
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public final void setMaxFieldSize(final int max) throws SQLException {
- recordMethodInvocation(targetClass, "setMaxFieldSize", new Class[]
{int.class}, new Object[] {max});
- forceExecuteTemplate.execute((Collection) getRoutedStatements(),
statement -> statement.setMaxFieldSize(max));
- }
-
- // TODO Confirm MaxRows for multiple databases is need special handle. eg:
10 statements maybe MaxRows / 10
- @Override
- public final int getMaxRows() throws SQLException {
- return getRoutedStatements().isEmpty() ? -1 :
getRoutedStatements().iterator().next().getMaxRows();
+ public final SQLWarning getWarnings() {
+ return null;
}
- @SuppressWarnings("unchecked")
@Override
- public final void setMaxRows(final int max) throws SQLException {
- recordMethodInvocation(targetClass, "setMaxRows", new Class[]
{int.class}, new Object[] {max});
- forceExecuteTemplate.execute((Collection) getRoutedStatements(),
statement -> statement.setMaxRows(max));
+ public final void clearWarnings() {
}
+ @SuppressWarnings({"unchecked", "rawtypes"})
@Override
- public final int getQueryTimeout() throws SQLException {
- return getRoutedStatements().isEmpty() ? 0 :
getRoutedStatements().iterator().next().getQueryTimeout();
+ public final void cancel() throws SQLException {
+ forceExecuteTemplate.execute((Collection) getRoutedStatements(),
Statement::cancel);
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "rawtypes"})
@Override
- public final void setQueryTimeout(final int seconds) throws SQLException {
- recordMethodInvocation(targetClass, "setQueryTimeout", new Class[]
{int.class}, new Object[] {seconds});
- forceExecuteTemplate.execute((Collection) getRoutedStatements(),
statement -> statement.setQueryTimeout(seconds));
+ public final void close() throws SQLException {
+ closed = true;
+ try {
+ forceExecuteTemplate.execute((Collection) getRoutedStatements(),
Statement::close);
+ if (null != getExecutor()) {
+ getExecutor().close();
+ }
+ } finally {
+ getRoutedStatements().clear();
+ }
}
protected abstract boolean isAccumulate();
protected abstract Collection<? extends Statement> getRoutedStatements();
- protected abstract FederationExecutor getFederationExecutor();
+ protected abstract DriverExecutor getExecutor();
}
diff --git
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index c468d86..195b60b 100644
---
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.driver.jdbc.core.statement;
import com.google.common.base.Strings;
import lombok.AccessLevel;
import lombok.Getter;
-import org.apache.shardingsphere.driver.executor.DriverJDBCExecutor;
+import org.apache.shardingsphere.driver.executor.DriverExecutor;
import org.apache.shardingsphere.driver.executor.batch.BatchExecutionUnit;
import
org.apache.shardingsphere.driver.executor.batch.BatchPreparedStatementExecutor;
import
org.apache.shardingsphere.driver.executor.callback.impl.PreparedStatementExecuteQueryCallback;
@@ -50,15 +50,12 @@ import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorEx
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
-import
org.apache.shardingsphere.infra.executor.sql.federate.execute.FederationExecutor;
-import
org.apache.shardingsphere.infra.executor.sql.federate.execute.FederationExecutorFactory;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
@@ -112,12 +109,8 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
@Getter
private final ParameterMetaData parameterMetaData;
- private final DriverJDBCExecutor driverJDBCExecutor;
-
- private final RawExecutor rawExecutor;
-
@Getter(AccessLevel.PROTECTED)
- private final FederationExecutor federationExecutor;
+ private final DriverExecutor executor;
private final BatchPreparedStatementExecutor
batchPreparedStatementExecutor;
@@ -164,10 +157,8 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
sqlStatement = sqlParserEngine.parse(sql, true);
parameterMetaData = new ShardingSphereParameterMetaData(sqlStatement);
statementOption = returnGeneratedKeys ? new StatementOption(true) :
new StatementOption(resultSetType, resultSetConcurrency, resultSetHoldability);
+ executor = new DriverExecutor(connection);
JDBCExecutor jdbcExecutor = new
JDBCExecutor(metaDataContexts.getExecutorEngine(),
connection.isHoldTransaction());
- driverJDBCExecutor = new
DriverJDBCExecutor(connection.getSchemaName(), metaDataContexts, jdbcExecutor);
- rawExecutor = new RawExecutor(metaDataContexts.getExecutorEngine(),
connection.isHoldTransaction(), metaDataContexts.getProps());
- federationExecutor =
FederationExecutorFactory.newInstance(connection.getSchemaName(),
metaDataContexts.getOptimizerContext(), metaDataContexts.getProps(),
jdbcExecutor);
batchPreparedStatementExecutor = new
BatchPreparedStatementExecutor(metaDataContexts, jdbcExecutor,
connection.getSchemaName());
kernelProcessor = new KernelProcessor();
statementsCacheable =
isStatementsCacheable(metaDataContexts.getMetaData(connection.getSchemaName()).getRuleMetaData().getConfigurations());
@@ -189,7 +180,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
executionContext = createExecutionContext();
List<QueryResult> queryResults = executeQuery0();
MergedResult mergedResult = mergeQuery(queryResults);
- return new
ShardingSphereResultSet(getResultSetsForShardingSphereResultSet(),
mergedResult, this, executionContext);
+ return new ShardingSphereResultSet(getShardingSphereResultSet(),
mergedResult, this, executionContext);
} finally {
clearBatch();
}
@@ -201,16 +192,16 @@ public final class ShardingSpherePreparedStatement
extends AbstractPreparedState
replaySetParameter();
}
- private List<ResultSet> getResultSetsForShardingSphereResultSet() throws
SQLException {
+ private List<ResultSet> getShardingSphereResultSet() throws SQLException {
if (executionContext.getRouteContext().isFederated()) {
- return
Collections.singletonList(federationExecutor.getResultSet());
+ return
Collections.singletonList(executor.getFederationExecutor().getResultSet());
}
return
statements.stream().map(this::getResultSet).collect(Collectors.toList());
}
private List<QueryResult> executeQuery0() throws SQLException {
if
(metaDataContexts.getMetaData(connection.getSchemaName()).getRuleMetaData().getRules().stream().anyMatch(each
-> each instanceof RawExecutionRule)) {
- return rawExecutor.execute(createRawExecutionGroupContext(),
executionContext.getLogicSQL(),
+ return
executor.getRawExecutor().execute(createRawExecutionGroupContext(),
executionContext.getLogicSQL(),
new RawSQLExecutorCallback()).stream().map(each ->
(QueryResult) each).collect(Collectors.toList());
}
if (executionContext.getRouteContext().isFederated()) {
@@ -218,7 +209,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
}
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
createExecutionGroupContext();
cacheStatements(executionGroupContext.getInputGroups());
- return driverJDBCExecutor.executeQuery(executionGroupContext,
executionContext.getLogicSQL(),
+ return
executor.getRegularExecutor().executeQuery(executionGroupContext,
executionContext.getLogicSQL(),
new
PreparedStatementExecuteQueryCallback(metaDataContexts.getMetaData(connection.getSchemaName()).getResource().getDatabaseType(),
sqlStatement,
SQLExecutorExceptionHandler.isExceptionThrown()));
}
@@ -229,7 +220,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
}
PreparedStatementExecuteQueryCallback callback = new
PreparedStatementExecuteQueryCallback(metaDataContexts.getMetaData(connection.getSchemaName()).getResource().getDatabaseType(),
sqlStatement,
SQLExecutorExceptionHandler.isExceptionThrown());
- return
federationExecutor.executeQuery(createDriverExecutionPrepareEngine(), callback,
executionContext);
+ return
executor.getFederationExecutor().executeQuery(createDriverExecutionPrepareEngine(),
callback, executionContext);
}
private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
createDriverExecutionPrepareEngine() {
@@ -248,12 +239,12 @@ public final class ShardingSpherePreparedStatement
extends AbstractPreparedState
clearPrevious();
executionContext = createExecutionContext();
if
(metaDataContexts.getMetaData(connection.getSchemaName()).getRuleMetaData().getRules().stream().anyMatch(each
-> each instanceof RawExecutionRule)) {
- Collection<ExecuteResult> executeResults =
rawExecutor.execute(createRawExecutionGroupContext(),
executionContext.getLogicSQL(), new RawSQLExecutorCallback());
+ Collection<ExecuteResult> executeResults =
executor.getRawExecutor().execute(createRawExecutionGroupContext(),
executionContext.getLogicSQL(), new RawSQLExecutorCallback());
return accumulate(executeResults);
}
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
createExecutionGroupContext();
cacheStatements(executionGroupContext.getInputGroups());
- return driverJDBCExecutor.executeUpdate(executionGroupContext,
+ return
executor.getRegularExecutor().executeUpdate(executionGroupContext,
executionContext.getLogicSQL(),
executionContext.getRouteContext().getRouteUnits(),
createExecuteUpdateCallback());
} finally {
clearBatch();
@@ -295,7 +286,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
executionContext = createExecutionContext();
if
(metaDataContexts.getMetaData(connection.getSchemaName()).getRuleMetaData().getRules().stream().anyMatch(each
-> each instanceof RawExecutionRule)) {
// TODO process getStatement
- Collection<ExecuteResult> executeResults =
rawExecutor.execute(createRawExecutionGroupContext(),
executionContext.getLogicSQL(), new RawSQLExecutorCallback());
+ Collection<ExecuteResult> executeResults =
executor.getRawExecutor().execute(createRawExecutionGroupContext(),
executionContext.getLogicSQL(), new RawSQLExecutorCallback());
return executeResults.iterator().next() instanceof QueryResult;
}
if (executionContext.getRouteContext().isFederated()) {
@@ -304,7 +295,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
}
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
createExecutionGroupContext();
cacheStatements(executionGroupContext.getInputGroups());
- return driverJDBCExecutor.execute(executionGroupContext,
+ return executor.getRegularExecutor().execute(executionGroupContext,
executionContext.getLogicSQL(),
executionContext.getRouteContext().getRouteUnits(), createExecuteCallback());
} finally {
clearBatch();
@@ -365,7 +356,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
result.add(each.getResultSet());
}
if (executionContext.getRouteContext().isFederated()) {
- result.add(federationExecutor.getResultSet());
+ result.add(executor.getFederationExecutor().getResultSet());
}
return result;
}
diff --git
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index d84e7a7..62a694d 100644
---
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.driver.jdbc.core.statement;
import com.google.common.base.Strings;
import lombok.AccessLevel;
import lombok.Getter;
-import org.apache.shardingsphere.driver.executor.DriverJDBCExecutor;
+import org.apache.shardingsphere.driver.executor.DriverExecutor;
import org.apache.shardingsphere.driver.executor.callback.ExecuteCallback;
import
org.apache.shardingsphere.driver.executor.callback.ExecuteUpdateCallback;
import
org.apache.shardingsphere.driver.executor.callback.impl.StatementExecuteQueryCallback;
@@ -47,17 +47,13 @@ import
org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
-import
org.apache.shardingsphere.infra.executor.sql.federate.execute.FederationExecutor;
-import
org.apache.shardingsphere.infra.executor.sql.federate.execute.FederationExecutorFactory;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
@@ -99,12 +95,8 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
private final StatementOption statementOption;
- private final DriverJDBCExecutor driverJDBCExecutor;
-
- private final RawExecutor rawExecutor;
-
@Getter(AccessLevel.PROTECTED)
- private final FederationExecutor federationExecutor;
+ private final DriverExecutor executor;
private final KernelProcessor kernelProcessor;
@@ -128,10 +120,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
metaDataContexts =
connection.getContextManager().getMetaDataContexts();
statements = new LinkedList<>();
statementOption = new StatementOption(resultSetType,
resultSetConcurrency, resultSetHoldability);
- JDBCExecutor jdbcExecutor = new
JDBCExecutor(metaDataContexts.getExecutorEngine(),
connection.isHoldTransaction());
- driverJDBCExecutor = new
DriverJDBCExecutor(connection.getSchemaName(), metaDataContexts, jdbcExecutor);
- rawExecutor = new RawExecutor(metaDataContexts.getExecutorEngine(),
connection.isHoldTransaction(), metaDataContexts.getProps());
- federationExecutor =
FederationExecutorFactory.newInstance(connection.getSchemaName(),
metaDataContexts.getOptimizerContext(), metaDataContexts.getProps(),
jdbcExecutor);
+ executor = new DriverExecutor(connection);
kernelProcessor = new KernelProcessor();
}
@@ -156,12 +145,12 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
private List<ResultSet> getShardingSphereResultSets() throws SQLException {
return executionContext.getRouteContext().isFederated()
- ? Collections.singletonList(federationExecutor.getResultSet())
: statements.stream().map(this::getResultSet).collect(Collectors.toList());
+ ?
Collections.singletonList(executor.getFederationExecutor().getResultSet()) :
statements.stream().map(this::getResultSet).collect(Collectors.toList());
}
private List<QueryResult> executeQuery0() throws SQLException {
if
(metaDataContexts.getMetaData(connection.getSchemaName()).getRuleMetaData().getRules().stream().anyMatch(each
-> each instanceof RawExecutionRule)) {
- return rawExecutor.execute(createRawExecutionContext(),
executionContext.getLogicSQL(),
+ return
executor.getRawExecutor().execute(createRawExecutionContext(),
executionContext.getLogicSQL(),
new RawSQLExecutorCallback()).stream().map(each ->
(QueryResult) each).collect(Collectors.toList());
}
if (executionContext.getRouteContext().isFederated()) {
@@ -171,7 +160,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
cacheStatements(executionGroupContext.getInputGroups());
StatementExecuteQueryCallback callback = new
StatementExecuteQueryCallback(metaDataContexts.getMetaData(connection.getSchemaName()).getResource().getDatabaseType(),
executionContext.getSqlStatementContext().getSqlStatement(),
SQLExecutorExceptionHandler.isExceptionThrown());
- return driverJDBCExecutor.executeQuery(executionGroupContext,
executionContext.getLogicSQL(), callback);
+ return
executor.getRegularExecutor().executeQuery(executionGroupContext,
executionContext.getLogicSQL(), callback);
}
private List<QueryResult> executeFederationQuery() throws SQLException {
@@ -180,7 +169,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
}
StatementExecuteQueryCallback callback = new
StatementExecuteQueryCallback(metaDataContexts.getMetaData(connection.getSchemaName()).getResource().getDatabaseType(),
executionContext.getSqlStatementContext().getSqlStatement(),
SQLExecutorExceptionHandler.isExceptionThrown());
- return
federationExecutor.executeQuery(createDriverExecutionPrepareEngine(), callback,
executionContext);
+ return
executor.getFederationExecutor().executeQuery(createDriverExecutionPrepareEngine(),
callback, executionContext);
}
private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
createDriverExecutionPrepareEngine() {
@@ -194,7 +183,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
try {
executionContext = createExecutionContext(sql);
if
(metaDataContexts.getMetaData(connection.getSchemaName()).getRuleMetaData().getRules().stream().anyMatch(each
-> each instanceof RawExecutionRule)) {
- return
accumulate(rawExecutor.execute(createRawExecutionContext(),
executionContext.getLogicSQL(), new RawSQLExecutorCallback()));
+ return
accumulate(executor.getRawExecutor().execute(createRawExecutionContext(),
executionContext.getLogicSQL(), new RawSQLExecutorCallback()));
}
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
createExecutionContext();
cacheStatements(executionGroupContext.getInputGroups());
@@ -213,7 +202,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
try {
executionContext = createExecutionContext(sql);
if
(metaDataContexts.getMetaData(connection.getSchemaName()).getRuleMetaData().getRules().stream().anyMatch(each
-> each instanceof RawExecutionRule)) {
- return
accumulate(rawExecutor.execute(createRawExecutionContext(),
executionContext.getLogicSQL(), new RawSQLExecutorCallback()));
+ return
accumulate(executor.getRawExecutor().execute(createRawExecutionContext(),
executionContext.getLogicSQL(), new RawSQLExecutorCallback()));
}
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
createExecutionContext();
cacheStatements(executionGroupContext.getInputGroups());
@@ -230,7 +219,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
try {
executionContext = createExecutionContext(sql);
if
(metaDataContexts.getMetaData(connection.getSchemaName()).getRuleMetaData().getRules().stream().anyMatch(each
-> each instanceof RawExecutionRule)) {
- return
accumulate(rawExecutor.execute(createRawExecutionContext(),
executionContext.getLogicSQL(), new RawSQLExecutorCallback()));
+ return
accumulate(executor.getRawExecutor().execute(createRawExecutionContext(),
executionContext.getLogicSQL(), new RawSQLExecutorCallback()));
}
ExecutionGroupContext<JDBCExecutionUnit> executionGroups =
createExecutionContext();
cacheStatements(executionGroups.getInputGroups());
@@ -247,7 +236,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
try {
executionContext = createExecutionContext(sql);
if
(metaDataContexts.getMetaData(connection.getSchemaName()).getRuleMetaData().getRules().stream().anyMatch(each
-> each instanceof RawExecutionRule)) {
- return
accumulate(rawExecutor.execute(createRawExecutionContext(),
executionContext.getLogicSQL(), new RawSQLExecutorCallback()));
+ return
accumulate(executor.getRawExecutor().execute(createRawExecutionContext(),
executionContext.getLogicSQL(), new RawSQLExecutorCallback()));
}
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
createExecutionContext();
cacheStatements(executionGroupContext.getInputGroups());
@@ -274,7 +263,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
return Optional.empty();
}
};
- return driverJDBCExecutor.executeUpdate(executionGroupContext,
executionContext.getLogicSQL(), routeUnits, callback);
+ return
executor.getRegularExecutor().executeUpdate(executionGroupContext,
executionContext.getLogicSQL(), routeUnits, callback);
}
private int accumulate(final Collection<ExecuteResult> results) {
@@ -310,7 +299,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
return execute0(sql, (actualSQL, statement) ->
statement.execute(actualSQL, columnNames));
}
- private boolean execute(final ExecutionGroupContext<JDBCExecutionUnit>
executionGroupContext, final ExecuteCallback executor,
+ private boolean execute(final ExecutionGroupContext<JDBCExecutionUnit>
executionGroupContext, final ExecuteCallback executeCallback,
final SQLStatement sqlStatement, final
Collection<RouteUnit> routeUnits) throws SQLException {
boolean isExceptionThrown =
SQLExecutorExceptionHandler.isExceptionThrown();
JDBCExecutorCallback<Boolean> jdbcExecutorCallback = new
JDBCExecutorCallback<Boolean>(
@@ -318,7 +307,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
@Override
protected Boolean executeSQL(final String sql, final Statement
statement, final ConnectionMode connectionMode) throws SQLException {
- return executor.execute(sql, statement);
+ return executeCallback.execute(sql, statement);
}
@Override
@@ -326,7 +315,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
return Optional.empty();
}
};
- return driverJDBCExecutor.execute(executionGroupContext,
executionContext.getLogicSQL(), routeUnits, jdbcExecutorCallback);
+ return executor.getRegularExecutor().execute(executionGroupContext,
executionContext.getLogicSQL(), routeUnits, jdbcExecutorCallback);
}
private boolean execute0(final String sql, final ExecuteCallback callback)
throws SQLException {
@@ -334,7 +323,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
executionContext = createExecutionContext(sql);
if
(metaDataContexts.getMetaData(connection.getSchemaName()).getRuleMetaData().getRules().stream().anyMatch(each
-> each instanceof RawExecutionRule)) {
// TODO process getStatement
- Collection<ExecuteResult> results =
rawExecutor.execute(createRawExecutionContext(),
executionContext.getLogicSQL(), new RawSQLExecutorCallback());
+ Collection<ExecuteResult> results =
executor.getRawExecutor().execute(createRawExecutionContext(),
executionContext.getLogicSQL(), new RawSQLExecutorCallback());
return results.iterator().next() instanceof QueryResult;
}
if (executionContext.getRouteContext().isFederated()) {
@@ -422,7 +411,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
result.add(each.getResultSet());
}
if (executionContext.getRouteContext().isFederated()) {
- result.add(federationExecutor.getResultSet());
+ result.add(executor.getFederationExecutor().getResultSet());
}
return result;
}
diff --git
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
index 2be17b5..86e5ee7 100644
---
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
+++
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
@@ -18,10 +18,10 @@
package org.apache.shardingsphere.driver.state.circuit.statement;
import lombok.Getter;
+import org.apache.shardingsphere.driver.executor.DriverExecutor;
+import
org.apache.shardingsphere.driver.jdbc.unsupported.AbstractUnsupportedOperationPreparedStatement;
import
org.apache.shardingsphere.driver.state.circuit.connection.CircuitBreakerConnection;
import
org.apache.shardingsphere.driver.state.circuit.resultset.CircuitBreakerResultSet;
-import
org.apache.shardingsphere.driver.jdbc.unsupported.AbstractUnsupportedOperationPreparedStatement;
-import
org.apache.shardingsphere.infra.executor.sql.federate.execute.FederationExecutor;
import java.io.InputStream;
import java.io.Reader;
@@ -276,7 +276,7 @@ public final class CircuitBreakerPreparedStatement extends
AbstractUnsupportedOp
}
@Override
- protected FederationExecutor getFederationExecutor() {
+ protected DriverExecutor getExecutor() {
return null;
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
index e738321..083737d 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
@@ -82,8 +82,8 @@ public final class ProxySQLExecutor {
this.databaseCommunicationEngine = databaseCommunicationEngine;
ExecutorEngine executorEngine =
BackendExecutorContext.getInstance().getExecutorEngine();
boolean isSerialExecute = backendConnection.isSerialExecute();
- jdbcExecutor = new ProxyJDBCExecutor(type, backendConnection,
databaseCommunicationEngine, new JDBCExecutor(executorEngine, isSerialExecute));
MetaDataContexts metaDataContexts =
ProxyContext.getInstance().getContextManager().getMetaDataContexts();
+ jdbcExecutor = new ProxyJDBCExecutor(type, backendConnection,
databaseCommunicationEngine, new JDBCExecutor(executorEngine, isSerialExecute));
rawExecutor = new RawExecutor(executorEngine, isSerialExecute,
metaDataContexts.getProps());
federationExecutor = FederationExecutorFactory.newInstance(
backendConnection.getSchemaName(),
metaDataContexts.getOptimizerContext(), metaDataContexts.getProps(), new
JDBCExecutor(executorEngine, isSerialExecute));
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
index fffa777..b118dad 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
@@ -314,11 +314,11 @@ public final class BackendConnection implements
ExecutorJDBCManager {
}
/**
- * Close federate executor.
+ * Close federation executor.
*
- * @return SQL exception when federate executor close
+ * @return SQL exception when federation executor close
*/
- public synchronized Collection<SQLException> closeFederateExecutor() {
+ public synchronized Collection<SQLException> closeFederationExecutor() {
Collection<SQLException> result = new LinkedList<>();
if (null != federationExecutor) {
try {
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
index 0dbf4ff..b3c59d2 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
@@ -123,7 +123,7 @@ public final class CommandExecutorTask implements Runnable {
Collection<SQLException> result = new LinkedList<>();
PrimaryVisitedManager.clear();
result.addAll(backendConnection.closeDatabaseCommunicationEngines(false));
- result.addAll(backendConnection.closeFederateExecutor());
+ result.addAll(backendConnection.closeFederationExecutor());
return result;
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
index d1afccb..c5a9ffd 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
@@ -103,7 +103,7 @@ public final class FrontendChannelInboundHandler extends
ChannelInboundHandlerAd
PrimaryVisitedManager.clear();
backendConnection.closeDatabaseCommunicationEngines(true);
backendConnection.closeConnections(true);
- backendConnection.closeFederateExecutor();
+ backendConnection.closeFederationExecutor();
databaseProtocolFrontendEngine.release(backendConnection);
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
index 1e2f063..a55edd4 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
@@ -101,7 +101,7 @@ public final class CommandExecutorTaskTest {
when(backendConnection.getConnectionStatus()).thenReturn(connectionStatus);
when(engine.getCodecEngine().createPacketPayload(message)).thenReturn(payload);
when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
-
when(backendConnection.closeFederateExecutor()).thenReturn(Collections.emptyList());
+
when(backendConnection.closeFederationExecutor()).thenReturn(Collections.emptyList());
CommandExecutorTask actual = new CommandExecutorTask(engine,
backendConnection, handlerContext, message);
actual.run();
verify(connectionStatus).waitUntilConnectionRelease();
@@ -121,7 +121,7 @@ public final class CommandExecutorTaskTest {
when(backendConnection.getConnectionStatus()).thenReturn(connectionStatus);
when(engine.getCodecEngine().createPacketPayload(message)).thenReturn(payload);
when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
-
when(backendConnection.closeFederateExecutor()).thenReturn(Collections.emptyList());
+
when(backendConnection.closeFederationExecutor()).thenReturn(Collections.emptyList());
CommandExecutorTask actual = new CommandExecutorTask(engine,
backendConnection, handlerContext, message);
actual.run();
verify(connectionStatus).waitUntilConnectionRelease();
@@ -145,7 +145,7 @@ public final class CommandExecutorTaskTest {
when(backendConnection.getConnectionStatus()).thenReturn(connectionStatus);
when(engine.getCodecEngine().createPacketPayload(message)).thenReturn(payload);
when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
-
when(backendConnection.closeFederateExecutor()).thenReturn(Collections.emptyList());
+
when(backendConnection.closeFederationExecutor()).thenReturn(Collections.emptyList());
CommandExecutorTask actual = new CommandExecutorTask(engine,
backendConnection, handlerContext, message);
actual.run();
verify(connectionStatus).waitUntilConnectionRelease();
@@ -166,7 +166,7 @@ public final class CommandExecutorTaskTest {
when(engine.getCommandExecuteEngine().getErrorPacket(mockException,
backendConnection)).thenReturn(databasePacket);
when(engine.getCommandExecuteEngine().getOtherPacket(backendConnection)).thenReturn(Optional.of(databasePacket));
when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
-
when(backendConnection.closeFederateExecutor()).thenReturn(Collections.emptyList());
+
when(backendConnection.closeFederationExecutor()).thenReturn(Collections.emptyList());
CommandExecutorTask actual = new CommandExecutorTask(engine,
backendConnection, handlerContext, message);
actual.run();
verify(handlerContext, atLeast(2)).writeAndFlush(databasePacket);