This is an automated email from the ASF dual-hosted git repository.
chengzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 0d960261d39 Support jdbc adapter to get connections from specified
database (#28283)
0d960261d39 is described below
commit 0d960261d3939509379490c82a3cacda1fcd4643
Author: ZhangCheng <[email protected]>
AuthorDate: Wed Aug 30 17:32:38 2023 +0800
Support jdbc adapter to get connections from specified database (#28283)
* Support jdbc adapter to get connections from specified database
* fix
* fix
* Support null value routing
* Support null value routing
* Support null value routing
---
.../session/connection/ConnectionContext.java | 29 ++++++++-
.../DriverDatabaseConnectionManager.java | 71 ++++++++++++++--------
.../statement/ShardingSpherePreparedStatement.java | 1 +
.../core/statement/ShardingSphereStatement.java | 3 +
.../DriverDatabaseConnectionManagerTest.java | 5 +-
.../transaction/ConnectionTransaction.java | 16 +++--
.../transaction/ConnectionTransactionTest.java | 15 +++--
7 files changed, 96 insertions(+), 44 deletions(-)
diff --git
a/infra/session/src/main/java/org/apache/shardingsphere/infra/session/connection/ConnectionContext.java
b/infra/session/src/main/java/org/apache/shardingsphere/infra/session/connection/ConnectionContext.java
index 2e5c7fc6b29..02692a8a043 100644
---
a/infra/session/src/main/java/org/apache/shardingsphere/infra/session/connection/ConnectionContext.java
+++
b/infra/session/src/main/java/org/apache/shardingsphere/infra/session/connection/ConnectionContext.java
@@ -27,6 +27,7 @@ import
org.apache.shardingsphere.infra.session.connection.transaction.Transactio
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.Optional;
/**
@@ -43,6 +44,8 @@ public final class ConnectionContext implements AutoCloseable
{
@Getter(AccessLevel.NONE)
private final UsedDataSourceProvider usedDataSourceProvider;
+ private String databaseName;
+
@Setter
private String trafficInstanceId;
@@ -56,7 +59,11 @@ public final class ConnectionContext implements
AutoCloseable {
* @return used data source names
*/
public Collection<String> getUsedDataSourceNames() {
- return usedDataSourceProvider.getNames();
+ Collection<String> result = new
HashSet<>(usedDataSourceProvider.getNames().size(), 1L);
+ for (String each : usedDataSourceProvider.getNames()) {
+ result.add(each.contains(".") ? each.split("\\.")[1] : each);
+ }
+ return result;
}
/**
@@ -82,6 +89,26 @@ public final class ConnectionContext implements
AutoCloseable {
transactionContext.close();
}
+ /**
+ * Set current database name.
+ *
+ * @param databaseName database name
+ */
+ public void setCurrentDatabase(final String databaseName) {
+ if (null != databaseName && !databaseName.equals(this.databaseName)) {
+ this.databaseName = databaseName;
+ }
+ }
+
+ /**
+ * Get database name.
+ *
+ * @return database name
+ */
+ public Optional<String> getDatabaseName() {
+ return Optional.ofNullable(databaseName);
+ }
+
@Override
public void close() {
trafficInstanceId = null;
diff --git
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/DriverDatabaseConnectionManager.java
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/DriverDatabaseConnectionManager.java
index 52888a2611d..c0cbf83a006 100644
---
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/DriverDatabaseConnectionManager.java
+++
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/DriverDatabaseConnectionManager.java
@@ -68,6 +68,8 @@ public final class DriverDatabaseConnectionManager implements
DatabaseConnection
private final Map<String, DataSource> physicalDataSourceMap = new
LinkedHashMap<>();
+ private final Map<String, DataSource> trafficDataSourceMap = new
LinkedHashMap<>();
+
@Getter
private final ConnectionTransaction connectionTransaction;
@@ -82,15 +84,27 @@ public final class DriverDatabaseConnectionManager
implements DatabaseConnection
@Getter
private final ConnectionContext connectionContext;
+ private final ContextManager contextManager;
+
+ private final String databaseName;
+
public DriverDatabaseConnectionManager(final String databaseName, final
ContextManager contextManager) {
for (Entry<String, StorageUnit> entry :
contextManager.getStorageUnits(databaseName).entrySet()) {
DataSource dataSource = entry.getValue().getDataSource();
- dataSourceMap.put(entry.getKey(), dataSource);
- physicalDataSourceMap.put(entry.getKey(), dataSource);
+ String cacheKey = getKey(databaseName, entry.getKey());
+ dataSourceMap.put(cacheKey, dataSource);
+ physicalDataSourceMap.put(cacheKey, dataSource);
}
- dataSourceMap.putAll(getTrafficDataSourceMap(databaseName,
contextManager));
- connectionTransaction = createConnectionTransaction(databaseName,
contextManager);
+ for (Entry<String, DataSource> entry :
getTrafficDataSourceMap(databaseName, contextManager).entrySet()) {
+ String cacheKey = getKey(databaseName, entry.getKey());
+ dataSourceMap.put(cacheKey, entry.getValue());
+ trafficDataSourceMap.put(cacheKey, entry.getValue());
+ }
+ connectionTransaction = createConnectionTransaction(contextManager);
connectionContext = new ConnectionContext(cachedConnections::keySet);
+ connectionContext.setCurrentDatabase(databaseName);
+ this.contextManager = contextManager;
+ this.databaseName = databaseName;
}
private Map<String, DataSource> getTrafficDataSourceMap(final String
databaseName, final ContextManager contextManager) {
@@ -134,9 +148,9 @@ public final class DriverDatabaseConnectionManager
implements DatabaseConnection
return String.format("%s//%s:%s/%s%s", jdbcUrlPrefix,
instanceMetaData.getIp(), instanceMetaData.getPort(), schema, jdbcUrlSuffix);
}
- private ConnectionTransaction createConnectionTransaction(final String
databaseName, final ContextManager contextManager) {
+ private ConnectionTransaction createConnectionTransaction(final
ContextManager contextManager) {
TransactionRule rule =
contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(TransactionRule.class);
- return new ConnectionTransaction(databaseName, rule);
+ return new ConnectionTransaction(rule);
}
/**
@@ -300,8 +314,8 @@ public final class DriverDatabaseConnectionManager
implements DatabaseConnection
*/
public String getRandomPhysicalDataSourceName() {
Collection<String> cachedPhysicalDataSourceNames =
Sets.intersection(physicalDataSourceMap.keySet(), cachedConnections.keySet());
- Collection<String> datasourceNames =
cachedPhysicalDataSourceNames.isEmpty() ? physicalDataSourceMap.keySet() :
cachedPhysicalDataSourceNames;
- return new
ArrayList<>(datasourceNames).get(random.nextInt(datasourceNames.size()));
+ Collection<String> databaseAndDatasourceNames =
cachedPhysicalDataSourceNames.isEmpty() ? physicalDataSourceMap.keySet() :
cachedPhysicalDataSourceNames;
+ return new
ArrayList<>(databaseAndDatasourceNames).get(random.nextInt(databaseAndDatasourceNames.size())).split("\\.")[1];
}
/**
@@ -316,56 +330,63 @@ public final class DriverDatabaseConnectionManager
implements DatabaseConnection
@Override
public List<Connection> getConnections(final String dataSourceName, final
int connectionOffset, final int connectionSize, final ConnectionMode
connectionMode) throws SQLException {
- DataSource dataSource = dataSourceMap.get(dataSourceName);
+ String currentDatabaseName =
connectionContext.getDatabaseName().orElse(databaseName);
+ String cacheKey = getKey(currentDatabaseName, dataSourceName);
+ DataSource dataSource = databaseName.equals(currentDatabaseName) ?
dataSourceMap.get(cacheKey) :
contextManager.getStorageUnits(currentDatabaseName).get(dataSourceName).getDataSource();
Preconditions.checkNotNull(dataSource, "Missing the data source name:
'%s'", dataSourceName);
Collection<Connection> connections;
synchronized (cachedConnections) {
- connections = cachedConnections.get(dataSourceName);
+ connections = cachedConnections.get(cacheKey);
}
List<Connection> result;
int maxConnectionSize = connectionOffset + connectionSize;
if (connections.size() >= maxConnectionSize) {
result = new ArrayList<>(connections).subList(connectionOffset,
maxConnectionSize);
} else if (connections.isEmpty()) {
- Collection<Connection> newConnections =
createConnections(dataSourceName, dataSource, maxConnectionSize,
connectionMode);
+ Collection<Connection> newConnections =
createConnections(currentDatabaseName, dataSourceName, dataSource,
maxConnectionSize, connectionMode);
result = new ArrayList<>(newConnections).subList(connectionOffset,
maxConnectionSize);
synchronized (cachedConnections) {
- cachedConnections.putAll(dataSourceName, newConnections);
+ cachedConnections.putAll(cacheKey, newConnections);
}
} else {
List<Connection> allConnections = new
ArrayList<>(maxConnectionSize);
allConnections.addAll(connections);
- Collection<Connection> newConnections =
createConnections(dataSourceName, dataSource, maxConnectionSize -
connections.size(), connectionMode);
+ Collection<Connection> newConnections =
createConnections(currentDatabaseName, dataSourceName, dataSource,
maxConnectionSize - connections.size(), connectionMode);
allConnections.addAll(newConnections);
result = allConnections.subList(connectionOffset,
maxConnectionSize);
synchronized (cachedConnections) {
- cachedConnections.putAll(dataSourceName, newConnections);
+ cachedConnections.putAll(cacheKey, newConnections);
}
}
return result;
}
+ private String getKey(final String databaseName, final String
dataSourceName) {
+ return databaseName.toLowerCase() + "." + dataSourceName;
+ }
+
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
- private List<Connection> createConnections(final String dataSourceName,
final DataSource dataSource, final int connectionSize, final ConnectionMode
connectionMode) throws SQLException {
+ private List<Connection> createConnections(final String databaseName,
final String dataSourceName, final DataSource dataSource, final int
connectionSize,
+ final ConnectionMode
connectionMode) throws SQLException {
if (1 == connectionSize) {
- Connection connection = createConnection(dataSourceName,
dataSource, connectionContext.getTransactionContext());
+ Connection connection = createConnection(databaseName,
dataSourceName, dataSource, connectionContext.getTransactionContext());
methodInvocationRecorder.replay(connection);
return Collections.singletonList(connection);
}
if (ConnectionMode.CONNECTION_STRICTLY == connectionMode) {
- return createConnections(dataSourceName, dataSource,
connectionSize, connectionContext.getTransactionContext());
+ return createConnections(databaseName, dataSourceName, dataSource,
connectionSize, connectionContext.getTransactionContext());
}
synchronized (dataSource) {
- return createConnections(dataSourceName, dataSource,
connectionSize, connectionContext.getTransactionContext());
+ return createConnections(databaseName, dataSourceName, dataSource,
connectionSize, connectionContext.getTransactionContext());
}
}
- private List<Connection> createConnections(final String dataSourceName,
final DataSource dataSource, final int connectionSize,
+ private List<Connection> createConnections(final String databaseName,
final String dataSourceName, final DataSource dataSource, final int
connectionSize,
final
TransactionConnectionContext transactionConnectionContext) throws SQLException {
List<Connection> result = new ArrayList<>(connectionSize);
for (int i = 0; i < connectionSize; i++) {
try {
- Connection connection = createConnection(dataSourceName,
dataSource, transactionConnectionContext);
+ Connection connection = createConnection(databaseName,
dataSourceName, dataSource, transactionConnectionContext);
methodInvocationRecorder.replay(connection);
result.add(connection);
} catch (final SQLException ignored) {
@@ -378,13 +399,15 @@ public final class DriverDatabaseConnectionManager
implements DatabaseConnection
return result;
}
- private Connection createConnection(final String dataSourceName, final
DataSource dataSource, final TransactionConnectionContext
transactionConnectionContext) throws SQLException {
- Optional<Connection> connectionInTransaction =
isRawJdbcDataSource(dataSourceName) ?
connectionTransaction.getConnection(dataSourceName,
transactionConnectionContext) : Optional.empty();
+ private Connection createConnection(final String databaseName, final
String dataSourceName, final DataSource dataSource,
+ final TransactionConnectionContext
transactionConnectionContext) throws SQLException {
+ Optional<Connection> connectionInTransaction =
+ isRawJdbcDataSource(databaseName, dataSourceName) ?
connectionTransaction.getConnection(databaseName, dataSourceName,
transactionConnectionContext) : Optional.empty();
return connectionInTransaction.isPresent() ?
connectionInTransaction.get() : dataSource.getConnection();
}
- private boolean isRawJdbcDataSource(final String dataSourceName) {
- return physicalDataSourceMap.containsKey(dataSourceName);
+ private boolean isRawJdbcDataSource(final String databaseName, final
String dataSourceName) {
+ return !trafficDataSourceMap.containsKey(getKey(databaseName,
dataSourceName));
}
@Override
diff --git
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index 9590bd597ef..2dc7d1fbc18 100644
---
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -201,6 +201,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
sqlStatement = sqlParserEngine.parse(this.sql, true);
sqlStatementContext = new
SQLBindEngine(metaDataContexts.getMetaData(),
connection.getDatabaseName()).bind(sqlStatement, Collections.emptyList());
databaseName =
sqlStatementContext.getTablesContext().getDatabaseName().orElse(connection.getDatabaseName());
+
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
parameterMetaData = new ShardingSphereParameterMetaData(sqlStatement);
statementOption = returnGeneratedKeys ? new StatementOption(true,
columns) : new StatementOption(resultSetType, resultSetConcurrency,
resultSetHoldability);
executor = new DriverExecutor(connection);
diff --git
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index a0fdfb4bd0b..00d18f1d3bc 100644
---
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -164,6 +164,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
try {
QueryContext queryContext = createQueryContext(sql);
databaseName =
queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName());
+
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
checkSameDatabaseNameInTransaction(queryContext.getSqlStatementContext(),
databaseName);
trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);
if (null != trafficInstanceId) {
@@ -313,6 +314,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
private int executeUpdate0(final String sql, final ExecuteUpdateCallback
updateCallback, final TrafficExecutorCallback<Integer> trafficCallback) throws
SQLException {
QueryContext queryContext = createQueryContext(sql);
databaseName =
queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName());
+
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
checkSameDatabaseNameInTransaction(queryContext.getSqlStatementContext(),
databaseName);
trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);
if (null != trafficInstanceId) {
@@ -431,6 +433,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
try {
QueryContext queryContext = createQueryContext(sql);
databaseName =
queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName());
+
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
checkSameDatabaseNameInTransaction(queryContext.getSqlStatementContext(),
databaseName);
trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);
if (null != trafficInstanceId) {
diff --git
a/jdbc/core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/DriverDatabaseConnectionManagerTest.java
b/jdbc/core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/DriverDatabaseConnectionManagerTest.java
index 893c6c4067c..654d6f56bfd 100644
---
a/jdbc/core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/DriverDatabaseConnectionManagerTest.java
+++
b/jdbc/core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/DriverDatabaseConnectionManagerTest.java
@@ -136,8 +136,9 @@ class DriverDatabaseConnectionManagerTest {
@Test
void assertGetRandomPhysicalDataSourceNameFromCache() throws SQLException {
databaseConnectionManager.getConnections("ds", 0, 1,
ConnectionMode.MEMORY_STRICTLY);
- String actual =
databaseConnectionManager.getRandomPhysicalDataSourceName();
- assertThat(actual, is("ds"));
+
assertThat(databaseConnectionManager.getRandomPhysicalDataSourceName(),
is("ds"));
+
assertThat(databaseConnectionManager.getRandomPhysicalDataSourceName(),
is("ds"));
+
assertThat(databaseConnectionManager.getRandomPhysicalDataSourceName(),
is("ds"));
}
@Test
diff --git
a/kernel/transaction/core/src/main/java/org/apache/shardingsphere/transaction/ConnectionTransaction.java
b/kernel/transaction/core/src/main/java/org/apache/shardingsphere/transaction/ConnectionTransaction.java
index ec39c677f87..f0d2e4fdd5e 100644
---
a/kernel/transaction/core/src/main/java/org/apache/shardingsphere/transaction/ConnectionTransaction.java
+++
b/kernel/transaction/core/src/main/java/org/apache/shardingsphere/transaction/ConnectionTransaction.java
@@ -36,20 +36,17 @@ public final class ConnectionTransaction {
@Getter
private final TransactionType transactionType;
- private final String databaseName;
-
@Setter
@Getter
private volatile boolean rollbackOnly;
private final ShardingSphereTransactionManager transactionManager;
- public ConnectionTransaction(final String databaseName, final
TransactionRule rule) {
- this(databaseName, rule.getDefaultType(), rule);
+ public ConnectionTransaction(final TransactionRule rule) {
+ this(rule.getDefaultType(), rule);
}
- public ConnectionTransaction(final String databaseName, final
TransactionType transactionType, final TransactionRule rule) {
- this.databaseName = databaseName;
+ public ConnectionTransaction(final TransactionType transactionType, final
TransactionRule rule) {
this.transactionType = transactionType;
transactionManager =
rule.getResource().getTransactionManager(transactionType);
}
@@ -94,14 +91,15 @@ public final class ConnectionTransaction {
/**
* Get connection in transaction.
- *
+ *
+ * @param databaseName database name
* @param dataSourceName data source name
* @param transactionConnectionContext transaction connection context
* @return connection in transaction
* @throws SQLException SQL exception
*/
- public Optional<Connection> getConnection(final String dataSourceName,
final TransactionConnectionContext transactionConnectionContext) throws
SQLException {
- return isInTransaction(transactionConnectionContext) ?
Optional.of(transactionManager.getConnection(this.databaseName,
dataSourceName)) : Optional.empty();
+ public Optional<Connection> getConnection(final String databaseName, final
String dataSourceName, final TransactionConnectionContext
transactionConnectionContext) throws SQLException {
+ return isInTransaction(transactionConnectionContext) ?
Optional.of(transactionManager.getConnection(databaseName, dataSourceName)) :
Optional.empty();
}
/**
diff --git
a/kernel/transaction/core/src/test/java/org/apache/shardingsphere/transaction/ConnectionTransactionTest.java
b/kernel/transaction/core/src/test/java/org/apache/shardingsphere/transaction/ConnectionTransactionTest.java
index 71252a1b351..3134ffbf4f9 100644
---
a/kernel/transaction/core/src/test/java/org/apache/shardingsphere/transaction/ConnectionTransactionTest.java
+++
b/kernel/transaction/core/src/test/java/org/apache/shardingsphere/transaction/ConnectionTransactionTest.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.transaction;
-import org.apache.shardingsphere.infra.database.core.DefaultDatabase;
import
org.apache.shardingsphere.transaction.ConnectionTransaction.DistributedTransactionOperationType;
import
org.apache.shardingsphere.transaction.config.TransactionRuleConfiguration;
import org.apache.shardingsphere.transaction.rule.TransactionRule;
@@ -37,34 +36,34 @@ class ConnectionTransactionTest {
@Test
void assertDistributedTransactionOperationTypeCommit() {
- connectionTransaction = new
ConnectionTransaction(DefaultDatabase.LOGIC_NAME, getXATransactionRule());
+ connectionTransaction = new
ConnectionTransaction(getXATransactionRule());
DistributedTransactionOperationType operationType =
connectionTransaction.getDistributedTransactionOperationType(true);
assertThat(operationType,
is(DistributedTransactionOperationType.COMMIT));
}
@Test
void assertDistributedTransactionOperationTypeIgnore() {
- connectionTransaction = new
ConnectionTransaction(DefaultDatabase.LOGIC_NAME, getXATransactionRule());
+ connectionTransaction = new
ConnectionTransaction(getXATransactionRule());
DistributedTransactionOperationType operationType =
connectionTransaction.getDistributedTransactionOperationType(false);
assertThat(operationType,
is(DistributedTransactionOperationType.IGNORE));
}
@Test
void assertIsLocalTransaction() {
- connectionTransaction = new
ConnectionTransaction(DefaultDatabase.LOGIC_NAME, getLocalTransactionRule());
+ connectionTransaction = new
ConnectionTransaction(getLocalTransactionRule());
assertTrue(connectionTransaction.isLocalTransaction());
- connectionTransaction = new
ConnectionTransaction(DefaultDatabase.LOGIC_NAME, getXATransactionRule());
+ connectionTransaction = new
ConnectionTransaction(getXATransactionRule());
assertFalse(connectionTransaction.isLocalTransaction());
}
@Test
void assertIsHoldTransaction() {
- connectionTransaction = new
ConnectionTransaction(DefaultDatabase.LOGIC_NAME, getLocalTransactionRule());
+ connectionTransaction = new
ConnectionTransaction(getLocalTransactionRule());
assertTrue(connectionTransaction.isHoldTransaction(false));
- connectionTransaction = new
ConnectionTransaction(DefaultDatabase.LOGIC_NAME, getXATransactionRule());
+ connectionTransaction = new
ConnectionTransaction(getXATransactionRule());
assertTrue(connectionTransaction.isInTransaction());
assertTrue(connectionTransaction.isHoldTransaction(true));
- connectionTransaction = new
ConnectionTransaction(DefaultDatabase.LOGIC_NAME, getLocalTransactionRule());
+ connectionTransaction = new
ConnectionTransaction(getLocalTransactionRule());
assertFalse(connectionTransaction.isHoldTransaction(true));
}