This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new e719e9b Add more TrafficLoadBalanceAlgorithm and optimize traffic
logic (#14567)
e719e9b is described below
commit e719e9bf19ab9212908f409888dc7c55be65d8e1
Author: Zhengqiang Duan <[email protected]>
AuthorDate: Thu Jan 6 17:20:19 2022 +0800
Add more TrafficLoadBalanceAlgorithm and optimize traffic logic (#14567)
* modify governance api reference in JDBCTrafficExecutor
* add more TrafficLoadBalanceAlgorithm and optimize traffic logic
* remove some algorithm
---
.../statement/ShardingSpherePreparedStatement.java | 14 +++---
.../core/statement/ShardingSphereStatement.java | 26 +++++------
.../api/traffic/segment/SegmentTrafficValue.java | 2 +
.../traffic/spi/TrafficLoadBalanceAlgorithm.java | 9 ++--
.../RandomTrafficLoadBalanceAlgorithm.java | 4 +-
... => RoundRobinTrafficLoadBalanceAlgorithm.java} | 30 +++++++++---
.../traffic/context/TrafficContext.java | 10 ++--
.../traffic/engine/TrafficEngine.java | 11 +++--
.../traffic/executor/TrafficExecutor.java | 4 +-
.../traffic/executor/jdbc/JDBCTrafficExecutor.java | 14 +++---
.../shardingsphere/traffic/rule/TrafficRule.java | 17 +++----
.../RandomTrafficLoadBalanceAlgorithmTest.java} | 25 +++++-----
.../RoundRobinTrafficLoadBalanceAlgorithmTest.java | 53 ++++++++++++++++++++++
13 files changed, 147 insertions(+), 72 deletions(-)
diff --git
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index 9e79414..3895e5b 100644
---
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -188,9 +188,9 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
clearPrevious();
LogicSQL logicSQL = createLogicSQL();
trafficContext = createTrafficContext(logicSQL);
- if (trafficContext.getDataSourceName().isPresent()) {
+ if (trafficContext.getInstanceId().isPresent()) {
TrafficExecutor trafficExecutor =
executor.getTrafficExecutor();
- TrafficExecutorContext<Statement> context =
trafficExecutor.prepare(logicSQL, trafficContext.getDataSourceName().get(),
JDBCDriverType.PREPARED_STATEMENT);
+ TrafficExecutorContext<Statement> context =
trafficExecutor.prepare(logicSQL, trafficContext.getInstanceId().get(),
JDBCDriverType.PREPARED_STATEMENT);
return trafficExecutor.executeQuery(logicSQL, context,
(statement, sql) -> ((PreparedStatement) statement).executeQuery());
}
// TODO move federation route logic to binder
@@ -260,9 +260,9 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
clearPrevious();
LogicSQL logicSQL = createLogicSQL();
trafficContext = createTrafficContext(logicSQL);
- if (trafficContext.getDataSourceName().isPresent()) {
+ if (trafficContext.getInstanceId().isPresent()) {
TrafficExecutor trafficExecutor =
executor.getTrafficExecutor();
- TrafficExecutorContext<Statement> context =
trafficExecutor.prepare(logicSQL, trafficContext.getDataSourceName().get(),
JDBCDriverType.PREPARED_STATEMENT);
+ TrafficExecutorContext<Statement> context =
trafficExecutor.prepare(logicSQL, trafficContext.getInstanceId().get(),
JDBCDriverType.PREPARED_STATEMENT);
return trafficExecutor.executeUpdate(logicSQL, context,
(statement, sql) -> ((PreparedStatement) statement).executeUpdate());
}
executionContext = createExecutionContext(logicSQL);
@@ -313,9 +313,9 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
clearPrevious();
LogicSQL logicSQL = createLogicSQL();
trafficContext = createTrafficContext(logicSQL);
- if (trafficContext.getDataSourceName().isPresent()) {
+ if (trafficContext.getInstanceId().isPresent()) {
TrafficExecutor trafficExecutor =
executor.getTrafficExecutor();
- TrafficExecutorContext<Statement> context =
trafficExecutor.prepare(logicSQL, trafficContext.getDataSourceName().get(),
JDBCDriverType.PREPARED_STATEMENT);
+ TrafficExecutorContext<Statement> context =
trafficExecutor.prepare(logicSQL, trafficContext.getInstanceId().get(),
JDBCDriverType.PREPARED_STATEMENT);
return trafficExecutor.execute(logicSQL, context, (statement,
sql) -> ((PreparedStatement) statement).execute());
}
executionContext = createExecutionContext(logicSQL);
@@ -369,7 +369,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
if (null != currentResultSet) {
return currentResultSet;
}
- if (trafficContext.getDataSourceName().isPresent()) {
+ if (trafficContext.getInstanceId().isPresent()) {
return executor.getTrafficExecutor().getResultSet();
}
if (executionContext.getRouteContext().isFederated()) {
diff --git
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index acefef6..616311f 100644
---
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -143,9 +143,9 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
try {
LogicSQL logicSQL = createLogicSQL(sql);
trafficContext = createTrafficContext(logicSQL);
- if (trafficContext.getDataSourceName().isPresent()) {
+ if (trafficContext.getInstanceId().isPresent()) {
TrafficExecutor trafficExecutor =
executor.getTrafficExecutor();
- TrafficExecutorContext<Statement> context =
trafficExecutor.prepare(logicSQL, trafficContext.getDataSourceName().get(),
JDBCDriverType.STATEMENT);
+ TrafficExecutorContext<Statement> context =
trafficExecutor.prepare(logicSQL, trafficContext.getInstanceId().get(),
JDBCDriverType.STATEMENT);
return trafficExecutor.executeQuery(logicSQL, context,
Statement::executeQuery);
}
executionContext = createExecutionContext(logicSQL);
@@ -201,9 +201,9 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
try {
LogicSQL logicSQL = createLogicSQL(sql);
trafficContext = createTrafficContext(logicSQL);
- if (trafficContext.getDataSourceName().isPresent()) {
+ if (trafficContext.getInstanceId().isPresent()) {
TrafficExecutor trafficExecutor =
executor.getTrafficExecutor();
- TrafficExecutorContext<Statement> context =
trafficExecutor.prepare(logicSQL, trafficContext.getDataSourceName().get(),
JDBCDriverType.STATEMENT);
+ TrafficExecutorContext<Statement> context =
trafficExecutor.prepare(logicSQL, trafficContext.getInstanceId().get(),
JDBCDriverType.STATEMENT);
return trafficExecutor.executeUpdate(logicSQL, context,
Statement::executeUpdate);
}
executionContext = createExecutionContext(logicSQL);
@@ -227,9 +227,9 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
try {
LogicSQL logicSQL = createLogicSQL(sql);
trafficContext = createTrafficContext(logicSQL);
- if (trafficContext.getDataSourceName().isPresent()) {
+ if (trafficContext.getInstanceId().isPresent()) {
TrafficExecutor trafficExecutor =
executor.getTrafficExecutor();
- TrafficExecutorContext<Statement> context =
trafficExecutor.prepare(logicSQL, trafficContext.getDataSourceName().get(),
JDBCDriverType.STATEMENT);
+ TrafficExecutorContext<Statement> context =
trafficExecutor.prepare(logicSQL, trafficContext.getInstanceId().get(),
JDBCDriverType.STATEMENT);
return trafficExecutor.executeUpdate(logicSQL, context,
(statement, actualSQL) -> statement.executeUpdate(actualSQL,
autoGeneratedKeys));
}
executionContext = createExecutionContext(logicSQL);
@@ -251,9 +251,9 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
try {
LogicSQL logicSQL = createLogicSQL(sql);
trafficContext = createTrafficContext(logicSQL);
- if (trafficContext.getDataSourceName().isPresent()) {
+ if (trafficContext.getInstanceId().isPresent()) {
TrafficExecutor trafficExecutor =
executor.getTrafficExecutor();
- TrafficExecutorContext<Statement> context =
trafficExecutor.prepare(logicSQL, trafficContext.getDataSourceName().get(),
JDBCDriverType.STATEMENT);
+ TrafficExecutorContext<Statement> context =
trafficExecutor.prepare(logicSQL, trafficContext.getInstanceId().get(),
JDBCDriverType.STATEMENT);
return trafficExecutor.executeUpdate(logicSQL, context,
(statement, actualSQL) -> statement.executeUpdate(actualSQL, columnIndexes));
}
executionContext = createExecutionContext(logicSQL);
@@ -275,9 +275,9 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
try {
LogicSQL logicSQL = createLogicSQL(sql);
trafficContext = createTrafficContext(logicSQL);
- if (trafficContext.getDataSourceName().isPresent()) {
+ if (trafficContext.getInstanceId().isPresent()) {
TrafficExecutor trafficExecutor =
executor.getTrafficExecutor();
- TrafficExecutorContext<Statement> context =
trafficExecutor.prepare(logicSQL, trafficContext.getDataSourceName().get(),
JDBCDriverType.STATEMENT);
+ TrafficExecutorContext<Statement> context =
trafficExecutor.prepare(logicSQL, trafficContext.getInstanceId().get(),
JDBCDriverType.STATEMENT);
return trafficExecutor.executeUpdate(logicSQL, context,
(statement, actualSQL) -> statement.executeUpdate(actualSQL, columnNames));
}
executionContext = createExecutionContext(logicSQL);
@@ -369,9 +369,9 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
try {
LogicSQL logicSQL = createLogicSQL(sql);
trafficContext = createTrafficContext(logicSQL);
- if (trafficContext.getDataSourceName().isPresent()) {
+ if (trafficContext.getInstanceId().isPresent()) {
TrafficExecutor trafficExecutor =
executor.getTrafficExecutor();
- TrafficExecutorContext<Statement> context =
trafficExecutor.prepare(logicSQL, trafficContext.getDataSourceName().get(),
JDBCDriverType.STATEMENT);
+ TrafficExecutorContext<Statement> context =
trafficExecutor.prepare(logicSQL, trafficContext.getInstanceId().get(),
JDBCDriverType.STATEMENT);
return trafficExecutor.execute(logicSQL, context,
(TrafficExecutorCallback) (statement, actualSQL) -> callback.execute(actualSQL,
statement));
}
executionContext = createExecutionContext(logicSQL);
@@ -450,7 +450,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
if (null != currentResultSet) {
return currentResultSet;
}
- if (trafficContext.getDataSourceName().isPresent()) {
+ if (trafficContext.getInstanceId().isPresent()) {
return executor.getTrafficExecutor().getResultSet();
}
if (executionContext.getRouteContext().isFederated()) {
diff --git
a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-api/src/main/java/org/apache/shardingsphere/traffic/api/traffic/segment/SegmentTrafficValue.java
b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-api/src/main/java/org/apache/shardingsphere/traffic/api/traffic/segment/SegmentTrafficValue.java
index 9df41d0..4f21a77 100644
---
a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-api/src/main/java/org/apache/shardingsphere/traffic/api/traffic/segment/SegmentTrafficValue.java
+++
b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-api/src/main/java/org/apache/shardingsphere/traffic/api/traffic/segment/SegmentTrafficValue.java
@@ -30,4 +30,6 @@ import
org.apache.shardingsphere.traffic.api.traffic.TrafficValue;
public final class SegmentTrafficValue implements TrafficValue {
private final SQLStatement statement;
+
+ private final String sql;
}
diff --git
a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-api/src/main/java/org/apache/shardingsphere/traffic/spi/TrafficLoadBalanceAlgorithm.java
b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-api/src/main/java/org/apache/shardingsphere/traffic/spi/TrafficLoadBalanceAlgorithm.java
index bea7fee..f237b94 100644
---
a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-api/src/main/java/org/apache/shardingsphere/traffic/spi/TrafficLoadBalanceAlgorithm.java
+++
b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-api/src/main/java/org/apache/shardingsphere/traffic/spi/TrafficLoadBalanceAlgorithm.java
@@ -28,10 +28,11 @@ import java.util.List;
public interface TrafficLoadBalanceAlgorithm extends ShardingSphereAlgorithm,
RequiredSPI {
/**
- * Get dataSource name.
+ * Get instance id.
*
- * @param dataSourceNames dataSource name collection
- * @return dataSource name
+ * @param name traffic strategy name
+ * @param instanceIds instance id collection
+ * @return instance id
*/
- String getDataSourceName(List<String> dataSourceNames);
+ String getInstanceId(String name, List<String> instanceIds);
}
diff --git
a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RandomTrafficLoadBalanceAlgorithm.java
b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RandomTrafficLoadBalanceAlgorithm.java
index c5cb4fa..9fd4ba6 100644
---
a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RandomTrafficLoadBalanceAlgorithm.java
+++
b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RandomTrafficLoadBalanceAlgorithm.java
@@ -28,8 +28,8 @@ import java.util.concurrent.ThreadLocalRandom;
public final class RandomTrafficLoadBalanceAlgorithm implements
TrafficLoadBalanceAlgorithm {
@Override
- public String getDataSourceName(final List<String> dataSourceNames) {
- return
dataSourceNames.get(ThreadLocalRandom.current().nextInt(dataSourceNames.size()));
+ public String getInstanceId(final String name, final List<String>
instanceIds) {
+ return
instanceIds.get(ThreadLocalRandom.current().nextInt(instanceIds.size()));
}
@Override
diff --git
a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RandomTrafficLoadBalanceAlgorithm.java
b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RoundRobinTrafficLoadBalanceAlgorithm.java
similarity index 52%
copy from
shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RandomTrafficLoadBalanceAlgorithm.java
copy to
shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RoundRobinTrafficLoadBalanceAlgorithm.java
index c5cb4fa..adb726d 100644
---
a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RandomTrafficLoadBalanceAlgorithm.java
+++
b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RoundRobinTrafficLoadBalanceAlgorithm.java
@@ -17,23 +17,41 @@
package org.apache.shardingsphere.traffic.algorithm.loadbalance;
+import lombok.Getter;
+import lombok.Setter;
import org.apache.shardingsphere.traffic.spi.TrafficLoadBalanceAlgorithm;
import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
/**
- * Random traffic load balance algorithm.
+ * Round-robin traffic load balance algorithm.
*/
-public final class RandomTrafficLoadBalanceAlgorithm implements
TrafficLoadBalanceAlgorithm {
+@Getter
+@Setter
+public final class RoundRobinTrafficLoadBalanceAlgorithm implements
TrafficLoadBalanceAlgorithm {
+
+ private static final ConcurrentHashMap<String, AtomicInteger> COUNTS = new
ConcurrentHashMap<>();
+
+ private Properties props = new Properties();
@Override
- public String getDataSourceName(final List<String> dataSourceNames) {
- return
dataSourceNames.get(ThreadLocalRandom.current().nextInt(dataSourceNames.size()));
+ public String getInstanceId(final String name, final List<String>
instanceIds) {
+ AtomicInteger count = COUNTS.containsKey(name) ? COUNTS.get(name) :
new AtomicInteger(0);
+ COUNTS.putIfAbsent(name, count);
+ count.compareAndSet(instanceIds.size(), 0);
+ return instanceIds.get(Math.abs(count.getAndIncrement()) %
instanceIds.size());
}
@Override
public String getType() {
- return "RANDOM";
+ return "ROUND_ROBIN";
+ }
+
+ @Override
+ public boolean isDefault() {
+ return true;
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/context/TrafficContext.java
b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/context/TrafficContext.java
index a05b820..eb38ab7 100644
---
a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/context/TrafficContext.java
+++
b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/context/TrafficContext.java
@@ -29,14 +29,14 @@ import java.util.Optional;
@Setter
public final class TrafficContext {
- private String dataSourceName;
+ private String instanceId;
/**
- * Get data source name.
+ * Get instance id.
*
- * @return data source config
+ * @return instance id
*/
- public Optional<String> getDataSourceName() {
- return Optional.ofNullable(dataSourceName);
+ public Optional<String> getInstanceId() {
+ return Optional.ofNullable(instanceId);
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/engine/TrafficEngine.java
b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/engine/TrafficEngine.java
index 20a4160..76284f5 100644
---
a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/engine/TrafficEngine.java
+++
b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/engine/TrafficEngine.java
@@ -54,18 +54,19 @@ public final class TrafficEngine {
if (!strategyRule.isPresent()) {
return result;
}
- List<String> dataSourceNames =
getDataSourceNamesByLabels(strategyRule.get().getLabels());
- if (!dataSourceNames.isEmpty()) {
+ List<String> instanceIds =
getInstanceIdsByLabels(strategyRule.get().getLabels());
+ if (!instanceIds.isEmpty()) {
TrafficLoadBalanceAlgorithm loadBalancer =
trafficRule.findLoadBalancer(strategyRule.get().getLoadBalancerName());
-
result.setDataSourceName(loadBalancer.getDataSourceName(dataSourceNames));
+
result.setInstanceId(loadBalancer.getInstanceId(strategyRule.get().getName(),
instanceIds));
}
return result;
}
- private List<String> getDataSourceNamesByLabels(final Collection<String>
labels) {
+ private List<String> getInstanceIdsByLabels(final Collection<String>
labels) {
List<String> result = new ArrayList<>();
if (metaDataContexts.getMetaDataPersistService().isPresent()) {
- for (ComputeNodeInstance each :
metaDataContexts.getMetaDataPersistService().get().loadComputeNodeInstances(InstanceType.PROXY,
labels)) {
+ Collection<ComputeNodeInstance> instances =
metaDataContexts.getMetaDataPersistService().get().loadComputeNodeInstances(InstanceType.PROXY,
labels);
+ for (ComputeNodeInstance each : instances) {
result.add(each.getInstanceDefinition().getInstanceId().getId());
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java
b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java
index 701b25d..f80df43 100644
---
a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java
+++
b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java
@@ -33,12 +33,12 @@ public interface TrafficExecutor extends AutoCloseable {
* Prepare for traffic executor.
*
* @param logicSQL logic SQL
- * @param dataSourceName dataSource name
+ * @param instanceId instance id
* @param type type
* @return traffic executor context
* @throws SQLException SQL exception
*/
- TrafficExecutorContext<Statement> prepare(LogicSQL logicSQL, String
dataSourceName, String type) throws SQLException;
+ TrafficExecutorContext<Statement> prepare(LogicSQL logicSQL, String
instanceId, String type) throws SQLException;
/**
* Execute query.
diff --git
a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/jdbc/JDBCTrafficExecutor.java
b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/jdbc/JDBCTrafficExecutor.java
index 4701588..7e20d43 100644
---
a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/jdbc/JDBCTrafficExecutor.java
+++
b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/jdbc/JDBCTrafficExecutor.java
@@ -115,21 +115,21 @@ public final class JDBCTrafficExecutor implements
TrafficExecutor {
DataSourceMetaData dataSourceMetaData =
DatabaseTypeRegistry.getDatabaseTypeByURL(jdbcUrl).getDataSourceMetaData(jdbcUrl,
username);
InstanceId instanceId =
instance.getInstanceDefinition().getInstanceId();
return jdbcUrl.replace(dataSourceMetaData.getHostname(),
instanceId.getIp())
- .replace(String.valueOf(dataSourceMetaData.getPort()),
String.valueOf(instanceId.getPort()).replace(dataSourceMetaData.getCatalog(),
schema));
+ .replace(String.valueOf(dataSourceMetaData.getPort()),
String.valueOf(instanceId.getPort())).replace(dataSourceMetaData.getCatalog(),
schema);
}
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
- public TrafficExecutorContext<Statement> prepare(final LogicSQL logicSQL,
final String dataSourceName, final String type) throws SQLException {
- if (!dataSources.containsKey(dataSourceName)) {
- throw new ShardingSphereException("Can not get dataSource of %.",
dataSourceName);
+ public TrafficExecutorContext<Statement> prepare(final LogicSQL logicSQL,
final String instanceId, final String type) throws SQLException {
+ if (!dataSources.containsKey(instanceId)) {
+ throw new ShardingSphereException("Can not get dataSource of %.",
instanceId);
}
- DataSource dataSource = dataSources.get(dataSourceName);
- TrafficExecutorContextBuilder builder =
getCachedTrafficExecutorContextBuilder(type);
+ DataSource dataSource = dataSources.get(instanceId);
+ TrafficExecutorContextBuilder builder = getCachedContextBuilder(type);
return builder.build(logicSQL, dataSource.getConnection());
}
- private TrafficExecutorContextBuilder<?>
getCachedTrafficExecutorContextBuilder(final String type) {
+ private TrafficExecutorContextBuilder<?> getCachedContextBuilder(final
String type) {
TrafficExecutorContextBuilder<?> result;
if (null == (result = TYPE_CONTEXT_BUILDERS.get(type))) {
result = TYPE_CONTEXT_BUILDERS.computeIfAbsent(type, key ->
TypedSPIRegistry.getRegisteredService(TrafficExecutorContextBuilder.class, key,
new Properties()));
diff --git
a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/rule/TrafficRule.java
b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/rule/TrafficRule.java
index f80c3ce..2c6e3ad 100644
---
a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/rule/TrafficRule.java
+++
b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/rule/TrafficRule.java
@@ -19,12 +19,12 @@ package org.apache.shardingsphere.traffic.rule;
import com.google.common.base.Preconditions;
import org.apache.shardingsphere.infra.binder.LogicSQL;
-import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmFactory;
import org.apache.shardingsphere.infra.rule.identifier.scope.GlobalRule;
import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
import
org.apache.shardingsphere.sql.parser.sql.common.segment.generic.CommentSegment;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.AbstractSQLStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import org.apache.shardingsphere.traffic.api.config.TrafficRuleConfiguration;
import org.apache.shardingsphere.traffic.api.traffic.hint.HintTrafficAlgorithm;
import org.apache.shardingsphere.traffic.api.traffic.hint.HintTrafficValue;
@@ -76,7 +76,7 @@ public final class TrafficRule implements GlobalRule {
for (TrafficStrategyRule each : trafficStrategyRules) {
TrafficAlgorithm trafficAlgorithm =
trafficAlgorithms.get(each.getAlgorithmName());
Preconditions.checkState(null != trafficAlgorithm, "Traffic
strategy rule configuration must match traffic algorithm.");
- if (match(trafficAlgorithm, logicSQL.getSqlStatementContext())) {
+ if (match(trafficAlgorithm, logicSQL)) {
return Optional.of(each);
}
}
@@ -84,10 +84,11 @@ public final class TrafficRule implements GlobalRule {
}
@SuppressWarnings("unchecked")
- private boolean match(final TrafficAlgorithm trafficAlgorithm, final
SQLStatementContext<?> statementContext) {
+ private boolean match(final TrafficAlgorithm trafficAlgorithm, final
LogicSQL logicSQL) {
+ SQLStatement sqlStatement =
logicSQL.getSqlStatementContext().getSqlStatement();
if (trafficAlgorithm instanceof HintTrafficAlgorithm) {
HintTrafficAlgorithm<Comparable<?>> hintTrafficAlgorithm =
(HintTrafficAlgorithm<Comparable<?>>) trafficAlgorithm;
- for (HintTrafficValue<Comparable<?>> each :
getHintTrafficValues(statementContext)) {
+ for (HintTrafficValue<Comparable<?>> each :
getHintTrafficValues(sqlStatement)) {
if (hintTrafficAlgorithm.match(each)) {
return true;
}
@@ -95,16 +96,16 @@ public final class TrafficRule implements GlobalRule {
}
if (trafficAlgorithm instanceof SegmentTrafficAlgorithm) {
SegmentTrafficAlgorithm segmentTrafficAlgorithm =
(SegmentTrafficAlgorithm) trafficAlgorithm;
- SegmentTrafficValue segmentTrafficValue = new
SegmentTrafficValue(statementContext.getSqlStatement());
+ SegmentTrafficValue segmentTrafficValue = new
SegmentTrafficValue(sqlStatement, logicSQL.getSql());
return segmentTrafficAlgorithm.match(segmentTrafficValue);
}
return false;
}
- private Collection<HintTrafficValue<Comparable<?>>>
getHintTrafficValues(final SQLStatementContext<?> statementContext) {
+ private Collection<HintTrafficValue<Comparable<?>>>
getHintTrafficValues(final SQLStatement sqlStatement) {
Collection<HintTrafficValue<Comparable<?>>> result = new
LinkedList<>();
- if (statementContext.getSqlStatement() instanceof
AbstractSQLStatement) {
- for (CommentSegment each : ((AbstractSQLStatement)
statementContext.getSqlStatement()).getCommentSegments()) {
+ if (sqlStatement instanceof AbstractSQLStatement) {
+ for (CommentSegment each : ((AbstractSQLStatement)
sqlStatement).getCommentSegments()) {
result.add(new HintTrafficValue<>(each.getText()));
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RandomTrafficLoadBalanceAlgorithm.java
b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/test/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RandomTrafficLoadBalanceAlgorithmTest.java
similarity index 56%
copy from
shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RandomTrafficLoadBalanceAlgorithm.java
copy to
shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/test/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RandomTrafficLoadBalanceAlgorithmTest.java
index c5cb4fa..fcf37f3 100644
---
a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RandomTrafficLoadBalanceAlgorithm.java
+++
b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/test/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RandomTrafficLoadBalanceAlgorithmTest.java
@@ -17,23 +17,22 @@
package org.apache.shardingsphere.traffic.algorithm.loadbalance;
-import org.apache.shardingsphere.traffic.spi.TrafficLoadBalanceAlgorithm;
+import org.junit.Test;
+import java.util.Arrays;
import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-/**
- * Random traffic load balance algorithm.
- */
-public final class RandomTrafficLoadBalanceAlgorithm implements
TrafficLoadBalanceAlgorithm {
+import static org.junit.Assert.assertTrue;
+
+public final class RandomTrafficLoadBalanceAlgorithmTest {
- @Override
- public String getDataSourceName(final List<String> dataSourceNames) {
- return
dataSourceNames.get(ThreadLocalRandom.current().nextInt(dataSourceNames.size()));
- }
+ private final RandomTrafficLoadBalanceAlgorithm randomAlgorithm = new
RandomTrafficLoadBalanceAlgorithm();
- @Override
- public String getType() {
- return "RANDOM";
+ @Test
+ public void assertGetInstanceId() {
+ List<String> instanceIds = Arrays.asList("127.0.0.1@3307",
"127.0.0.1@3308");
+
assertTrue(instanceIds.contains(randomAlgorithm.getInstanceId("simple_traffic",
instanceIds)));
+
assertTrue(instanceIds.contains(randomAlgorithm.getInstanceId("simple_traffic",
instanceIds)));
+
assertTrue(instanceIds.contains(randomAlgorithm.getInstanceId("simple_traffic",
instanceIds)));
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/test/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RoundRobinTrafficLoadBalanceAlgorithmTest.java
b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/test/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RoundRobinTrafficLoadBalanceAlgorithmTest.java
new file mode 100644
index 0000000..b075e69
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/test/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RoundRobinTrafficLoadBalanceAlgorithmTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.traffic.algorithm.loadbalance;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public final class RoundRobinTrafficLoadBalanceAlgorithmTest {
+
+ private final RoundRobinTrafficLoadBalanceAlgorithm roundRobinAlgorithm =
new RoundRobinTrafficLoadBalanceAlgorithm();
+
+ @Before
+ @After
+ public void reset() throws NoSuchFieldException, IllegalAccessException {
+ Field field =
RoundRobinTrafficLoadBalanceAlgorithm.class.getDeclaredField("COUNTS");
+ field.setAccessible(true);
+ ((ConcurrentHashMap<?, ?>)
field.get(RoundRobinTrafficLoadBalanceAlgorithm.class)).clear();
+ }
+
+ @Test
+ public void assertGetInstanceId() {
+ String instanceId1 = "127.0.0.1@3307";
+ String instanceId2 = "127.0.0.1@3308";
+ List<String> instanceIds = Arrays.asList(instanceId1, instanceId2);
+ assertThat(roundRobinAlgorithm.getInstanceId("simple_traffic",
instanceIds), is(instanceId1));
+ assertThat(roundRobinAlgorithm.getInstanceId("simple_traffic",
instanceIds), is(instanceId2));
+ assertThat(roundRobinAlgorithm.getInstanceId("simple_traffic",
instanceIds), is(instanceId1));
+ }
+}