This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c18af1e Remove apache-commons-lang3 Pair reference for scaling module
(#16023)
c18af1e is described below
commit c18af1e04322c7e5dc001e1542ee5ac8fe31146c
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Mar 13 11:43:42 2022 +0800
Remove apache-commons-lang3 Pair reference for scaling module (#16023)
* Remove apache-commons-lang3 Pair for AbstractDataSourcePreparerTest
* Refactor RuleAlteredJobWorker
* Remove apache-commons-lang3 Pair for ScalingUtil
* Remove apache-commons-lang3 Pair for ScalingUtil
* Remove apache-commons-lang3 Pair for DataCalculateParameter
* Remove apache-commons-lang3 Pair for DataCalculateParameter
* Refactor AbstractInventoryDumper
* Update uniqueKeyValueRange
---
.../ingest/dumper/AbstractInventoryDumper.java | 39 +++++++-------------
.../scenario/rulealtered/RuleAlteredJobWorker.java | 5 ++-
.../check/consistency/DataCalculateParameter.java | 6 ++--
.../integration/scaling/test/mysql/ScalingIT.java | 21 ++++-------
.../test/mysql/env/IntegrationTestEnvironment.java | 2 +-
.../scaling/test/mysql/util/ScalingUtil.java | 42 ++++++++--------------
.../datasource/AbstractDataSourcePreparerTest.java | 21 ++++-------
7 files changed, 45 insertions(+), 91 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
index b537ee1..fff60c6 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
@@ -60,32 +60,33 @@ public abstract class AbstractInventoryDumper extends
AbstractLifecycleExecutor
@Getter(AccessLevel.PROTECTED)
private final InventoryDumperConfiguration inventoryDumperConfig;
+ private final PipelineChannel channel;
+
+ private final DataSource dataSource;
+
private final int batchSize;
private final JobRateLimitAlgorithm rateLimitAlgorithm;
private final LazyInitializer<PipelineTableMetaData>
tableMetaDataLazyInitializer;
- private final PipelineChannel channel;
-
- private final DataSource dataSource;
-
protected AbstractInventoryDumper(final InventoryDumperConfiguration
inventoryDumperConfig, final PipelineChannel channel,
final DataSource dataSource, final
PipelineTableMetaDataLoader metaDataLoader) {
if
(!StandardPipelineDataSourceConfiguration.class.equals(inventoryDumperConfig.getDataSourceConfig().getClass()))
{
throw new UnsupportedOperationException("AbstractInventoryDumper
only support StandardPipelineDataSourceConfiguration");
}
this.inventoryDumperConfig = inventoryDumperConfig;
- this.batchSize = inventoryDumperConfig.getBatchSize();
- this.rateLimitAlgorithm =
inventoryDumperConfig.getRateLimitAlgorithm();
+ this.channel = channel;
+ this.dataSource = dataSource;
+ batchSize = inventoryDumperConfig.getBatchSize();
+ rateLimitAlgorithm = inventoryDumperConfig.getRateLimitAlgorithm();
tableMetaDataLazyInitializer = new
LazyInitializer<PipelineTableMetaData>() {
+
@Override
protected PipelineTableMetaData initialize() {
return
metaDataLoader.getTableMetaData(inventoryDumperConfig.getTableName());
}
};
- this.channel = channel;
- this.dataSource = dataSource;
}
@Override
@@ -172,30 +173,16 @@ public abstract class AbstractInventoryDumper extends
AbstractLifecycleExecutor
}
private long getPositionBeginValue(final IngestPosition<?> position) {
- if (null == position) {
- return 0;
- }
- if (!(position instanceof PrimaryKeyPosition)) {
- return 0;
- }
- return ((PrimaryKeyPosition) position).getBeginValue();
+ return position instanceof PrimaryKeyPosition ? ((PrimaryKeyPosition)
position).getBeginValue() : 0;
}
private long getPositionEndValue(final IngestPosition<?> position) {
- if (null == position) {
- return Integer.MAX_VALUE;
- }
- if (!(position instanceof PrimaryKeyPosition)) {
- return Integer.MAX_VALUE;
- }
- return ((PrimaryKeyPosition) position).getEndValue();
+ return position instanceof PrimaryKeyPosition ? ((PrimaryKeyPosition)
position).getEndValue() : Integer.MAX_VALUE;
}
private IngestPosition<?> newPosition(final ResultSet rs) throws
SQLException {
- if (null == inventoryDumperConfig.getPrimaryKey()) {
- return new PlaceholderPosition();
- }
- return new
PrimaryKeyPosition(rs.getLong(inventoryDumperConfig.getPrimaryKey()),
((PrimaryKeyPosition) inventoryDumperConfig.getPosition()).getEndValue());
+ return null == inventoryDumperConfig.getPrimaryKey() ? new
PlaceholderPosition()
+ : new
PrimaryKeyPosition(rs.getLong(inventoryDumperConfig.getPrimaryKey()),
((PrimaryKeyPosition) inventoryDumperConfig.getPosition()).getEndValue());
}
protected abstract PreparedStatement createPreparedStatement(Connection
connection, String sql) throws SQLException;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index 2e640c3..bf0568c 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -181,7 +181,6 @@ public final class RuleAlteredJobWorker {
Optional<String> jobId = jobConfigOptional.isPresent() ?
PipelineJobAPIFactory.getRuleAlteredJobAPI().start(jobConfigOptional.get()) :
Optional.empty();
if (!jobId.isPresent()) {
log.info("Switch rule configuration immediately.");
- YamlRootConfiguration targetRootConfig =
getYamlRootConfiguration(event.getSchemaName(), event.getTargetDataSource(),
event.getTargetRule());
ScalingTaskFinishedEvent taskFinishedEvent = new
ScalingTaskFinishedEvent(event.getSchemaName(), event.getSchemaVersion());
ShardingSphereEventBus.getInstance().post(taskFinishedEvent);
}
@@ -216,8 +215,8 @@ public final class RuleAlteredJobWorker {
return Optional.of(new JobConfiguration(workflowConfig,
pipelineConfig));
}
- private Collection<Pair<YamlRuleConfiguration, YamlRuleConfiguration>>
groupSourceTargetRuleConfigsByType(
- final Collection<YamlRuleConfiguration> sourceRules, final
Collection<YamlRuleConfiguration> targetRules) {
+ private Collection<Pair<YamlRuleConfiguration, YamlRuleConfiguration>>
groupSourceTargetRuleConfigsByType(final Collection<YamlRuleConfiguration>
sourceRules,
+
final Collection<YamlRuleConfiguration>
targetRules) {
Map<Class<? extends YamlRuleConfiguration>, YamlRuleConfiguration>
sourceRulesMap =
sourceRules.stream().collect(Collectors.toMap(YamlRuleConfiguration::getClass,
Function.identity()));
Map<Class<? extends YamlRuleConfiguration>, YamlRuleConfiguration>
targetRulesMap =
targetRules.stream().collect(Collectors.toMap(YamlRuleConfiguration::getClass,
Function.identity()));
Collection<Pair<YamlRuleConfiguration, YamlRuleConfiguration>> result
= new LinkedList<>();
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataCalculateParameter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataCalculateParameter.java
index ae8bff3..686bab4 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataCalculateParameter.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataCalculateParameter.java
@@ -17,11 +17,11 @@
package org.apache.shardingsphere.data.pipeline.api.check.consistency;
+import com.google.common.collect.Range;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
-import org.apache.commons.lang3.tuple.Pair;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import java.util.Collection;
@@ -66,10 +66,8 @@ public final class DataCalculateParameter {
/**
* Used for range query.
- * If it's configured, then it could be translated to SQL like "uniqueKey
>= pair.left AND uniqueKey <= pair.right".
- * One of left and right of pair could be null.
*/
- private volatile Pair<Object, Object> uniqueKeyValueRange;
+ private volatile Range<? extends Comparable<?>> uniqueKeyValueRange;
/**
* Used for multiple records query.
diff --git
a/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/ScalingIT.java
b/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/ScalingIT.java
index 05f1052..ae378f4 100644
---
a/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/ScalingIT.java
+++
b/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/ScalingIT.java
@@ -17,9 +17,7 @@
package org.apache.shardingsphere.integration.scaling.test.mysql;
-import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.tuple.Pair;
import
org.apache.shardingsphere.integration.scaling.test.mysql.env.ITEnvironmentContext;
import
org.apache.shardingsphere.integration.scaling.test.mysql.env.IntegrationTestEnvironment;
import
org.apache.shardingsphere.integration.scaling.test.mysql.fixture.DataImporter;
@@ -28,7 +26,6 @@ import
org.apache.shardingsphere.integration.scaling.test.mysql.util.ScalingUtil
import org.junit.Ignore;
import org.junit.Test;
-import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
@@ -44,9 +41,8 @@ public final class ScalingIT {
private static final long WAIT_MS_BEFORE_CHECK_JOB = 15 * 1000;
- @SneakyThrows(InterruptedException.class)
@Test
- public void assertScaling() {
+ public void assertScaling() throws InterruptedException {
if (IntegrationTestEnvironment.getInstance().isEnvironmentPrepared()) {
IntegrationTestEnvironment.getInstance().waitForEnvironmentReady();
DataImporter dataImporter = new DataImporter();
@@ -60,24 +56,19 @@ public final class ScalingIT {
}
}
- @SneakyThrows(IOException.class)
private String assertStartJob() {
String configuration =
ITEnvironmentContext.INSTANCE.getScalingConfiguration();
- Pair<Boolean, String> response =
ScalingUtil.getInstance().startJob(configuration);
- assertTrue(response.getLeft());
- return response.getRight();
+ return ScalingUtil.startJob(configuration);
}
private void waitInventoryFinish(final String jobId) {
- new ExecuteUtil(() ->
"EXECUTE_INCREMENTAL_TASK".equals(ScalingUtil.getInstance().getJobStatus(jobId)),
(int) (TIMEOUT_MS - WAIT_MS_BEFORE_START_JOB) / (10 * 1000), 10 *
1000).execute();
+ new ExecuteUtil(() ->
"EXECUTE_INCREMENTAL_TASK".equals(ScalingUtil.getJobStatus(jobId)), (int)
(TIMEOUT_MS - WAIT_MS_BEFORE_START_JOB) / (10 * 1000), 10 * 1000).execute();
}
- @SneakyThrows(IOException.class)
private void assertJobCheck(final String jobId) {
- Map<String, Pair<Boolean, Boolean>> checkResult =
ScalingUtil.getInstance().getJobCheckResult(jobId);
- for (Entry<String, Pair<Boolean, Boolean>> entry :
checkResult.entrySet()) {
- assertTrue(entry.getValue().getLeft());
- assertTrue(entry.getValue().getRight());
+ Map<String, Boolean> checkResult =
ScalingUtil.getJobCheckResult(jobId);
+ for (Entry<String, Boolean> entry : checkResult.entrySet()) {
+ assertTrue(entry.getValue());
}
}
}
diff --git
a/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/env/IntegrationTestEnvironment.java
b/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/env/IntegrationTestEnvironment.java
index 5398a47..ad64d43 100644
---
a/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/env/IntegrationTestEnvironment.java
+++
b/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/env/IntegrationTestEnvironment.java
@@ -64,7 +64,7 @@ public final class IntegrationTestEnvironment {
private boolean isScalingReady() {
try {
- ScalingUtil.getInstance().getJobList();
+ ScalingUtil.getJobList();
} catch (final IOException ignore) {
return false;
}
diff --git
a/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/util/ScalingUtil.java
b/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/util/ScalingUtil.java
index 5e793b4..fcbc5fd 100644
---
a/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/util/ScalingUtil.java
+++
b/shardingsphere-test/shardingsphere-integration-scaling-test/shardingsphere-integration-scaling-test-mysql/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/util/ScalingUtil.java
@@ -18,50 +18,37 @@
package org.apache.shardingsphere.integration.scaling.test.mysql.util;
import com.google.gson.JsonElement;
-import org.apache.commons.lang3.tuple.Pair;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
/**
- * Ok http utils.
+ * Scaling util.
*/
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ScalingUtil {
- private static final ScalingUtil INSTANCE = new ScalingUtil();
-
- private ScalingUtil() {
- }
-
- /**
- * Get instance.
- *
- * @return instance
- */
- public static ScalingUtil getInstance() {
- return INSTANCE;
- }
-
/**
* Start job.
*
- * @param configuration configuration
- * @return result
- * @throws IOException io exception
+ * @param jobConfig job configuration
+ * @return started job Id
*/
- public Pair<Boolean, String> startJob(final String configuration) throws
IOException {
+ public static String startJob(final String jobConfig) {
// TODO startJob
- return Pair.of(false, "");
+ return "";
}
/**
* Get job status.
*
- * @param jobId job id
+ * @param jobId job ID
* @return job status
*/
- public String getJobStatus(final String jobId) {
+ public static String getJobStatus(final String jobId) {
try {
// TODO getJobStatus
return "";
@@ -75,11 +62,10 @@ public final class ScalingUtil {
/**
* Check job.
*
- * @param jobId job id
+ * @param jobId job ID
* @return check result
- * @throws IOException io exception
*/
- public Map<String, Pair<Boolean, Boolean>> getJobCheckResult(final String
jobId) throws IOException {
+ public static Map<String, Boolean> getJobCheckResult(final String jobId) {
// TODO getJobCheckResult
return Collections.emptyMap();
}
@@ -88,9 +74,9 @@ public final class ScalingUtil {
* Get job list.
*
* @return result
- * @throws IOException io exception
+ * @throws IOException IO exception
*/
- public JsonElement getJobList() throws IOException {
+ public static JsonElement getJobList() throws IOException {
// TODO getJobList
return null;
}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparerTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparerTest.java
index c2eb2a1..a4f2fd3 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparerTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparerTest.java
@@ -17,14 +17,11 @@
package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
-import org.apache.commons.lang3.tuple.Pair;
import
org.apache.shardingsphere.data.pipeline.api.prepare.datasource.TableDefinitionSQLType;
import org.junit.Test;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.List;
import java.util.regex.Pattern;
import static org.hamcrest.CoreMatchers.is;
@@ -36,6 +33,7 @@ public final class AbstractDataSourcePreparerTest {
private static final Pattern PATTERN_CREATE_TABLE_IF_NOT_EXISTS =
Pattern.compile("CREATE\\s+TABLE\\s+IF\\s+NOT\\s+EXISTS\\s+",
Pattern.CASE_INSENSITIVE);
private final AbstractDataSourcePreparer preparer = new
AbstractDataSourcePreparer() {
+
@Override
public void prepareTargetTables(final PrepareTargetTablesParameter
parameter) {
}
@@ -43,22 +41,17 @@ public final class AbstractDataSourcePreparerTest {
@Test
public void assertGetTableDefinitionSQLType() {
- Collection<Pair<String, TableDefinitionSQLType>> pairs = new
ArrayList<>();
- pairs.add(Pair.of("SET search_path = public",
TableDefinitionSQLType.UNKNOWN));
- pairs.add(Pair.of("CREATE TABLE t1_0 (id int NOT NULL)",
TableDefinitionSQLType.CREATE_TABLE));
- pairs.add(Pair.of("ALTER TABLE t1_0 ADD CONSTRAINT t1_0_pkey PRIMARY
KEY (id)", TableDefinitionSQLType.ALTER_TABLE));
- for (Pair<String, TableDefinitionSQLType> each : pairs) {
- TableDefinitionSQLType sqlType =
preparer.getTableDefinitionSQLType(each.getKey());
- assertThat(sqlType, is(each.getValue()));
- }
+ assertThat(preparer.getTableDefinitionSQLType("SET search_path =
public"), is(TableDefinitionSQLType.UNKNOWN));
+ assertThat(preparer.getTableDefinitionSQLType("CREATE TABLE t1_0 (id
int NOT NULL)"), is(TableDefinitionSQLType.CREATE_TABLE));
+ assertThat(preparer.getTableDefinitionSQLType("ALTER TABLE t1_0 ADD
CONSTRAINT t1_0_pkey PRIMARY KEY (id)"),
is(TableDefinitionSQLType.ALTER_TABLE));
}
@Test
public void assertAddIfNotExistsForCreateTableSQL() {
- List<String> createTableSQLs = Arrays.asList("CREATE TABLE IF NOT
EXISTS t (id int)", "CREATE TABLE t (id int)",
+ Collection<String> createTableSQLs = Arrays.asList("CREATE TABLE IF
NOT EXISTS t (id int)", "CREATE TABLE t (id int)",
"CREATE TABLE IF \nNOT \tEXISTS t (id int)", "CREATE \tTABLE
t (id int)");
- for (String createTableSQL : createTableSQLs) {
- String sql =
preparer.addIfNotExistsForCreateTableSQL(createTableSQL);
+ for (String each : createTableSQLs) {
+ String sql = preparer.addIfNotExistsForCreateTableSQL(each);
assertTrue(PATTERN_CREATE_TABLE_IF_NOT_EXISTS.matcher(sql).find());
}
}