This is an automated email from the ASF dual-hosted git repository.
zhangyonglun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 31fb74f Merge DatabaseCommunicationEngine and
JDBCDatabaseCommunicationEngine (#8427)
31fb74f is described below
commit 31fb74fd516b5064b0ceb70e62be58bfd2c6945a
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Nov 30 22:15:14 2020 +0800
Merge DatabaseCommunicationEngine and JDBCDatabaseCommunicationEngine
(#8427)
---
.../communication/DatabaseCommunicationEngine.java | 171 +++++++++++++++++-
.../DatabaseCommunicationEngineFactory.java | 5 +-
.../jdbc/JDBCDatabaseCommunicationEngine.java | 201 ---------------------
.../DatabaseCommunicationEngineFactoryTest.java | 8 +-
4 files changed, 171 insertions(+), 214 deletions(-)
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
index cc28518..135f55a 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
@@ -17,15 +17,63 @@
package org.apache.shardingsphere.proxy.backend.communication;
-import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.binder.LogicSQL;
+import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
+import
org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
+import org.apache.shardingsphere.infra.context.kernel.KernelProcessor;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
+import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
+import org.apache.shardingsphere.infra.executor.sql.log.SQLLogger;
+import org.apache.shardingsphere.infra.merge.MergeEngine;
+import org.apache.shardingsphere.infra.merge.result.MergedResult;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
+import
org.apache.shardingsphere.infra.metadata.schema.builder.SchemaBuilderMaterials;
+import
org.apache.shardingsphere.infra.metadata.schema.refresher.SchemaRefresher;
+import
org.apache.shardingsphere.infra.metadata.schema.refresher.SchemaRefresherFactory;
+import
org.apache.shardingsphere.infra.metadata.schema.refresher.spi.SchemaChangedNotifier;
+import org.apache.shardingsphere.infra.rule.type.DataNodeContainedRule;
+import org.apache.shardingsphere.infra.spi.ordered.OrderedSPIRegistry;
+import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseData;
+import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
+import
org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
+import
org.apache.shardingsphere.proxy.backend.response.header.query.impl.QueryHeader;
+import
org.apache.shardingsphere.proxy.backend.response.header.query.impl.QueryHeaderBuilder;
+import
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DeleteStatement;
+import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.InsertStatement;
+import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.UpdateStatement;
import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
/**
* Database communication engine.
*/
-public interface DatabaseCommunicationEngine {
+@RequiredArgsConstructor
+public final class DatabaseCommunicationEngine {
+
+ private final LogicSQL logicSQL;
+
+ private final ShardingSphereMetaData metaData;
+
+ private final ProxySQLExecutor proxySQLExecutor;
+
+ private final KernelProcessor kernelProcessor = new KernelProcessor();
+
+ private List<QueryHeader> queryHeaders;
+
+ private MergedResult mergedResult;
/**
* Execute to database.
@@ -33,7 +81,112 @@ public interface DatabaseCommunicationEngine {
* @return backend response
* @throws SQLException SQL exception
*/
- ResponseHeader execute() throws SQLException;
+ public ResponseHeader execute() throws SQLException {
+ ExecutionContext executionContext =
kernelProcessor.generateExecutionContext(logicSQL, metaData,
ProxyContext.getInstance().getMetaDataContexts().getProps());
+ logSQL(executionContext);
+ return doExecute(executionContext);
+ }
+
+ private void logSQL(final ExecutionContext executionContext) {
+ if
(ProxyContext.getInstance().getMetaDataContexts().getProps().<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW))
{
+ SQLLogger.logSQL(logicSQL,
ProxyContext.getInstance().getMetaDataContexts().getProps().<Boolean>getValue(ConfigurationPropertyKey.SQL_SIMPLE),
executionContext);
+ }
+ }
+
+ private ResponseHeader doExecute(final ExecutionContext executionContext)
throws SQLException {
+ if (executionContext.getExecutionUnits().isEmpty()) {
+ return new UpdateResponseHeader();
+ }
+ proxySQLExecutor.checkExecutePrerequisites(executionContext);
+ Collection<ExecuteResult> executeResults =
proxySQLExecutor.execute(executionContext);
+ ExecuteResult executeResultSample = executeResults.iterator().next();
+ return executeResultSample instanceof QueryResult
+ ? processExecuteQuery(executionContext, executeResults,
(QueryResult) executeResultSample) : processExecuteUpdate(executionContext,
executeResults);
+ }
+
+ private QueryResponseHeader processExecuteQuery(final ExecutionContext
executionContext,
+ final
Collection<ExecuteResult> executeResults, final QueryResult
executeResultSample) throws SQLException {
+ queryHeaders = createQueryHeaders(executionContext,
executeResultSample);
+ mergedResult = mergeQuery(executionContext.getSqlStatementContext(),
executeResults.stream().map(each -> (QueryResult)
each).collect(Collectors.toList()));
+ return new QueryResponseHeader(queryHeaders);
+ }
+
+ private List<QueryHeader> createQueryHeaders(final ExecutionContext
executionContext, final QueryResult executeResultSample) throws SQLException {
+ int columnCount = executeResultSample.getMetaData().getColumnCount();
+ List<QueryHeader> result = new ArrayList<>(columnCount);
+ for (int columnIndex = 1; columnIndex <= columnCount; columnIndex++) {
+ result.add(createQueryHeader(executionContext,
executeResultSample, metaData, columnIndex));
+ }
+ return result;
+ }
+
+ private QueryHeader createQueryHeader(final ExecutionContext
executionContext,
+ final QueryResult
executeResultSample, final ShardingSphereMetaData metaData, final int
columnIndex) throws SQLException {
+ return
hasSelectExpandProjections(executionContext.getSqlStatementContext())
+ ? QueryHeaderBuilder.build(((SelectStatementContext)
executionContext.getSqlStatementContext()).getProjectionsContext(),
executeResultSample, metaData, columnIndex)
+ : QueryHeaderBuilder.build(executeResultSample, metaData,
columnIndex);
+ }
+
+ private boolean hasSelectExpandProjections(final SQLStatementContext<?>
sqlStatementContext) {
+ return sqlStatementContext instanceof SelectStatementContext &&
!((SelectStatementContext)
sqlStatementContext).getProjectionsContext().getExpandProjections().isEmpty();
+ }
+
+ private MergedResult mergeQuery(final SQLStatementContext<?>
sqlStatementContext, final List<QueryResult> queryResults) throws SQLException {
+ MergeEngine mergeEngine = new
MergeEngine(ProxyContext.getInstance().getMetaDataContexts().getDatabaseType(),
+ metaData.getSchema(),
ProxyContext.getInstance().getMetaDataContexts().getProps(),
metaData.getRuleMetaData().getRules());
+ return mergeEngine.merge(queryResults, sqlStatementContext);
+ }
+
+ private UpdateResponseHeader processExecuteUpdate(final ExecutionContext
executionContext, final Collection<ExecuteResult> executeResults) throws
SQLException {
+ UpdateResponseHeader result = createUpdateResponse(executionContext,
executeResults);
+ refreshSchema(executionContext);
+ mergeUpdateCount(executionContext.getSqlStatementContext(), result);
+ return result;
+ }
+
+ private UpdateResponseHeader createUpdateResponse(final ExecutionContext
executionContext, final Collection<ExecuteResult> executeResults) {
+ UpdateResponseHeader result = new UpdateResponseHeader(executeResults);
+ if (executionContext.getSqlStatementContext().getSqlStatement()
instanceof InsertStatement) {
+ result.setType("INSERT");
+ } else if (executionContext.getSqlStatementContext().getSqlStatement()
instanceof DeleteStatement) {
+ result.setType("DELETE");
+ } else if (executionContext.getSqlStatementContext().getSqlStatement()
instanceof UpdateStatement) {
+ result.setType("UPDATE");
+ }
+ return result;
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private void refreshSchema(final ExecutionContext executionContext) throws
SQLException {
+ SQLStatement sqlStatement =
executionContext.getSqlStatementContext().getSqlStatement();
+ if (null == sqlStatement) {
+ return;
+ }
+ Optional<SchemaRefresher> schemaRefresher =
SchemaRefresherFactory.newInstance(sqlStatement);
+ if (schemaRefresher.isPresent()) {
+ Collection<String> routeDataSourceNames =
executionContext.getRouteContext().getRouteUnits().stream().map(each ->
each.getDataSourceMapper().getLogicName()).collect(Collectors.toList());
+ SchemaBuilderMaterials materials = new
SchemaBuilderMaterials(ProxyContext.getInstance().getMetaDataContexts().getDatabaseType(),
+ metaData.getResource().getDataSources(),
metaData.getRuleMetaData().getRules(),
ProxyContext.getInstance().getMetaDataContexts().getProps());
+ schemaRefresher.get().refresh(metaData.getSchema(),
routeDataSourceNames, sqlStatement, materials);
+ notifySchemaChanged(metaData.getName(), metaData.getSchema());
+ }
+ }
+
+ private void notifySchemaChanged(final String schemaName, final
ShardingSphereSchema schema) {
+
OrderedSPIRegistry.getRegisteredServices(Collections.singletonList(schema),
SchemaChangedNotifier.class).values().forEach(each -> each.notify(schemaName,
schema));
+ }
+
+ private void mergeUpdateCount(final SQLStatementContext<?>
sqlStatementContext, final UpdateResponseHeader response) {
+ if (isNeedAccumulate(sqlStatementContext)) {
+ response.mergeUpdateCount();
+ }
+ }
+
+ private boolean isNeedAccumulate(final SQLStatementContext<?>
sqlStatementContext) {
+ Optional<DataNodeContainedRule> dataNodeContainedRule =
+ metaData.getRuleMetaData().getRules().stream().filter(each ->
each instanceof DataNodeContainedRule).findFirst().map(rule ->
(DataNodeContainedRule) rule);
+ return dataNodeContainedRule.isPresent() &&
dataNodeContainedRule.get().isNeedAccumulate(sqlStatementContext.getTablesContext().getTableNames());
+ }
/**
* Goto next result value.
@@ -41,7 +194,9 @@ public interface DatabaseCommunicationEngine {
* @return has more result value or not
* @throws SQLException SQL exception
*/
- boolean next() throws SQLException;
+ public boolean next() throws SQLException {
+ return null != mergedResult && mergedResult.next();
+ }
/**
* Get query response data.
@@ -49,5 +204,11 @@ public interface DatabaseCommunicationEngine {
* @return query response data
* @throws SQLException SQL exception
*/
- QueryResponseData getQueryResponseData() throws SQLException;
+ public QueryResponseData getQueryResponseData() throws SQLException {
+ List<Object> row = new ArrayList<>(queryHeaders.size());
+ for (int columnIndex = 1; columnIndex <= queryHeaders.size();
columnIndex++) {
+ row.add(mergedResult.getValue(columnIndex, Object.class));
+ }
+ return new
QueryResponseData(queryHeaders.stream().map(QueryHeader::getColumnType).collect(Collectors.toList()),
row);
+ }
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineFactory.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineFactory.java
index 913c80c..5756251 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineFactory.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineFactory.java
@@ -24,7 +24,6 @@ import
org.apache.shardingsphere.infra.binder.SQLStatementContextFactory;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.builder.JDBCExecutionUnitBuilderType;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import
org.apache.shardingsphere.proxy.backend.communication.jdbc.JDBCDatabaseCommunicationEngine;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
@@ -62,7 +61,7 @@ public final class DatabaseCommunicationEngineFactory {
ShardingSphereMetaData metaData =
ProxyContext.getInstance().getMetaData(backendConnection.getSchemaName());
LogicSQL logicSQL = createLogicSQL(sqlStatement, sql,
Collections.emptyList(), metaData);
ProxySQLExecutor proxySQLExecutor = new
ProxySQLExecutor(JDBCExecutionUnitBuilderType.STATEMENT, backendConnection);
- return new JDBCDatabaseCommunicationEngine(logicSQL, metaData,
proxySQLExecutor);
+ return new DatabaseCommunicationEngine(logicSQL, metaData,
proxySQLExecutor);
}
/**
@@ -78,7 +77,7 @@ public final class DatabaseCommunicationEngineFactory {
ShardingSphereMetaData metaData =
ProxyContext.getInstance().getMetaData(backendConnection.getSchemaName());
LogicSQL logicSQL = createLogicSQL(sqlStatement, sql, new
ArrayList<>(parameters), metaData);
ProxySQLExecutor proxySQLExecutor = new
ProxySQLExecutor(JDBCExecutionUnitBuilderType.PREPARED_STATEMENT,
backendConnection);
- return new JDBCDatabaseCommunicationEngine(logicSQL, metaData,
proxySQLExecutor);
+ return new DatabaseCommunicationEngine(logicSQL, metaData,
proxySQLExecutor);
}
private LogicSQL createLogicSQL(final SQLStatement sqlStatement, final
String sql, final List<Object> parameters, final ShardingSphereMetaData
metaData) {
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
deleted file mode 100644
index 456698d..0000000
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.proxy.backend.communication.jdbc;
-
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.binder.LogicSQL;
-import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
-import
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
-import
org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
-import org.apache.shardingsphere.infra.context.kernel.KernelProcessor;
-import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
-import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
-import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
-import org.apache.shardingsphere.infra.executor.sql.log.SQLLogger;
-import org.apache.shardingsphere.infra.merge.MergeEngine;
-import org.apache.shardingsphere.infra.merge.result.MergedResult;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
-import
org.apache.shardingsphere.infra.metadata.schema.builder.SchemaBuilderMaterials;
-import
org.apache.shardingsphere.infra.metadata.schema.refresher.SchemaRefresher;
-import
org.apache.shardingsphere.infra.metadata.schema.refresher.SchemaRefresherFactory;
-import
org.apache.shardingsphere.infra.metadata.schema.refresher.spi.SchemaChangedNotifier;
-import org.apache.shardingsphere.infra.rule.type.DataNodeContainedRule;
-import org.apache.shardingsphere.infra.spi.ordered.OrderedSPIRegistry;
-import
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
-import org.apache.shardingsphere.proxy.backend.communication.ProxySQLExecutor;
-import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
-import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
-import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseData;
-import
org.apache.shardingsphere.proxy.backend.response.header.query.impl.QueryHeader;
-import
org.apache.shardingsphere.proxy.backend.response.header.query.impl.QueryHeaderBuilder;
-import
org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
-import
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
-import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
-import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DeleteStatement;
-import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.InsertStatement;
-import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.UpdateStatement;
-
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-/**
- * Database access engine for JDBC.
- */
-@RequiredArgsConstructor
-public final class JDBCDatabaseCommunicationEngine implements
DatabaseCommunicationEngine {
-
- private final LogicSQL logicSQL;
-
- private final ShardingSphereMetaData metaData;
-
- private final ProxySQLExecutor proxySQLExecutor;
-
- private final KernelProcessor kernelProcessor = new KernelProcessor();
-
- private List<QueryHeader> queryHeaders;
-
- private MergedResult mergedResult;
-
- @Override
- public ResponseHeader execute() throws SQLException {
- ExecutionContext executionContext =
kernelProcessor.generateExecutionContext(logicSQL, metaData,
ProxyContext.getInstance().getMetaDataContexts().getProps());
- logSQL(executionContext);
- return doExecute(executionContext);
- }
-
- private void logSQL(final ExecutionContext executionContext) {
- if
(ProxyContext.getInstance().getMetaDataContexts().getProps().<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW))
{
- SQLLogger.logSQL(logicSQL,
ProxyContext.getInstance().getMetaDataContexts().getProps().<Boolean>getValue(ConfigurationPropertyKey.SQL_SIMPLE),
executionContext);
- }
- }
-
- private ResponseHeader doExecute(final ExecutionContext executionContext)
throws SQLException {
- if (executionContext.getExecutionUnits().isEmpty()) {
- return new UpdateResponseHeader();
- }
- proxySQLExecutor.checkExecutePrerequisites(executionContext);
- Collection<ExecuteResult> executeResults =
proxySQLExecutor.execute(executionContext);
- ExecuteResult executeResultSample = executeResults.iterator().next();
- return executeResultSample instanceof QueryResult
- ? processExecuteQuery(executionContext, executeResults,
(QueryResult) executeResultSample) : processExecuteUpdate(executionContext,
executeResults);
- }
-
- private QueryResponseHeader processExecuteQuery(final ExecutionContext
executionContext,
- final
Collection<ExecuteResult> executeResults, final QueryResult
executeResultSample) throws SQLException {
- queryHeaders = createQueryHeaders(executionContext,
executeResultSample);
- mergedResult = mergeQuery(executionContext.getSqlStatementContext(),
executeResults.stream().map(each -> (QueryResult)
each).collect(Collectors.toList()));
- return new QueryResponseHeader(queryHeaders);
- }
-
- private List<QueryHeader> createQueryHeaders(final ExecutionContext
executionContext, final QueryResult executeResultSample) throws SQLException {
- int columnCount = executeResultSample.getMetaData().getColumnCount();
- List<QueryHeader> result = new ArrayList<>(columnCount);
- for (int columnIndex = 1; columnIndex <= columnCount; columnIndex++) {
- result.add(createQueryHeader(executionContext,
executeResultSample, metaData, columnIndex));
- }
- return result;
- }
-
- private QueryHeader createQueryHeader(final ExecutionContext
executionContext,
- final QueryResult
executeResultSample, final ShardingSphereMetaData metaData, final int
columnIndex) throws SQLException {
- return
hasSelectExpandProjections(executionContext.getSqlStatementContext())
- ? QueryHeaderBuilder.build(((SelectStatementContext)
executionContext.getSqlStatementContext()).getProjectionsContext(),
executeResultSample, metaData, columnIndex)
- : QueryHeaderBuilder.build(executeResultSample, metaData,
columnIndex);
- }
-
- private boolean hasSelectExpandProjections(final SQLStatementContext<?>
sqlStatementContext) {
- return sqlStatementContext instanceof SelectStatementContext &&
!((SelectStatementContext)
sqlStatementContext).getProjectionsContext().getExpandProjections().isEmpty();
- }
-
- private MergedResult mergeQuery(final SQLStatementContext<?>
sqlStatementContext, final List<QueryResult> queryResults) throws SQLException {
- MergeEngine mergeEngine = new
MergeEngine(ProxyContext.getInstance().getMetaDataContexts().getDatabaseType(),
- metaData.getSchema(),
ProxyContext.getInstance().getMetaDataContexts().getProps(),
metaData.getRuleMetaData().getRules());
- return mergeEngine.merge(queryResults, sqlStatementContext);
- }
-
- private UpdateResponseHeader processExecuteUpdate(final ExecutionContext
executionContext, final Collection<ExecuteResult> executeResults) throws
SQLException {
- UpdateResponseHeader result = createUpdateResponse(executionContext,
executeResults);
- refreshSchema(executionContext);
- mergeUpdateCount(executionContext.getSqlStatementContext(), result);
- return result;
- }
-
- private UpdateResponseHeader createUpdateResponse(final ExecutionContext
executionContext, final Collection<ExecuteResult> executeResults) {
- UpdateResponseHeader result = new UpdateResponseHeader(executeResults);
- if (executionContext.getSqlStatementContext().getSqlStatement()
instanceof InsertStatement) {
- result.setType("INSERT");
- } else if (executionContext.getSqlStatementContext().getSqlStatement()
instanceof DeleteStatement) {
- result.setType("DELETE");
- } else if (executionContext.getSqlStatementContext().getSqlStatement()
instanceof UpdateStatement) {
- result.setType("UPDATE");
- }
- return result;
- }
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- private void refreshSchema(final ExecutionContext executionContext) throws
SQLException {
- SQLStatement sqlStatement =
executionContext.getSqlStatementContext().getSqlStatement();
- if (null == sqlStatement) {
- return;
- }
- Optional<SchemaRefresher> schemaRefresher =
SchemaRefresherFactory.newInstance(sqlStatement);
- if (schemaRefresher.isPresent()) {
- Collection<String> routeDataSourceNames =
executionContext.getRouteContext().getRouteUnits().stream().map(each ->
each.getDataSourceMapper().getLogicName()).collect(Collectors.toList());
- SchemaBuilderMaterials materials = new
SchemaBuilderMaterials(ProxyContext.getInstance().getMetaDataContexts().getDatabaseType(),
- metaData.getResource().getDataSources(),
metaData.getRuleMetaData().getRules(),
ProxyContext.getInstance().getMetaDataContexts().getProps());
- schemaRefresher.get().refresh(metaData.getSchema(),
routeDataSourceNames, sqlStatement, materials);
- notifySchemaChanged(metaData.getName(), metaData.getSchema());
- }
- }
-
- private void notifySchemaChanged(final String schemaName, final
ShardingSphereSchema schema) {
-
OrderedSPIRegistry.getRegisteredServices(Collections.singletonList(schema),
SchemaChangedNotifier.class).values().forEach(each -> each.notify(schemaName,
schema));
- }
-
- private void mergeUpdateCount(final SQLStatementContext<?>
sqlStatementContext, final UpdateResponseHeader response) {
- if (isNeedAccumulate(sqlStatementContext)) {
- response.mergeUpdateCount();
- }
- }
-
- private boolean isNeedAccumulate(final SQLStatementContext<?>
sqlStatementContext) {
- Optional<DataNodeContainedRule> dataNodeContainedRule =
- metaData.getRuleMetaData().getRules().stream().filter(each ->
each instanceof DataNodeContainedRule).findFirst().map(rule ->
(DataNodeContainedRule) rule);
- return dataNodeContainedRule.isPresent() &&
dataNodeContainedRule.get().isNeedAccumulate(sqlStatementContext.getTablesContext().getTableNames());
- }
-
- @Override
- public boolean next() throws SQLException {
- return null != mergedResult && mergedResult.next();
- }
-
- @Override
- public QueryResponseData getQueryResponseData() throws SQLException {
- List<Object> row = new ArrayList<>(queryHeaders.size());
- for (int columnIndex = 1; columnIndex <= queryHeaders.size();
columnIndex++) {
- row.add(mergedResult.getValue(columnIndex, Object.class));
- }
- return new
QueryResponseData(queryHeaders.stream().map(QueryHeader::getColumnType).collect(Collectors.toList()),
row);
- }
-}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/DatabaseCommunicationEngineFactoryTest.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineFactoryTest.java
similarity index 90%
rename from
shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/DatabaseCommunicationEngineFactoryTest.java
rename to
shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineFactoryTest.java
index 35a858e..9acfbbd 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/DatabaseCommunicationEngineFactoryTest.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineFactoryTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.proxy.backend.communication.jdbc;
+package org.apache.shardingsphere.proxy.backend.communication;
import org.apache.shardingsphere.infra.auth.Authentication;
import
org.apache.shardingsphere.infra.config.properties.ConfigurationProperties;
@@ -23,8 +23,6 @@ import
org.apache.shardingsphere.infra.context.metadata.impl.StandardMetaDataCon
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
-import
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
@@ -69,7 +67,7 @@ public final class DatabaseCommunicationEngineFactoryTest {
DatabaseCommunicationEngine engine =
DatabaseCommunicationEngineFactory.getInstance().newTextProtocolInstance(mock(SQLStatement.class),
"schemaName", backendConnection);
assertNotNull(engine);
- assertThat(engine, instanceOf(JDBCDatabaseCommunicationEngine.class));
+ assertThat(engine, instanceOf(DatabaseCommunicationEngine.class));
}
@Test
@@ -79,6 +77,6 @@ public final class DatabaseCommunicationEngineFactoryTest {
DatabaseCommunicationEngine engine =
DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(mock(SQLStatement.class),
"schemaName", Collections.emptyList(), backendConnection);
assertNotNull(engine);
- assertThat(engine, instanceOf(JDBCDatabaseCommunicationEngine.class));
+ assertThat(engine, instanceOf(DatabaseCommunicationEngine.class));
}
}