This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 88bbf022842 Clean keyword and TODO (#20711)
88bbf022842 is described below
commit 88bbf022842e045faed45fe5ff6d6b6b97a25628
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Sep 1 19:22:04 2022 +0800
Clean keyword and TODO (#20711)
---
...MigrationCheckAlgorithmsQueryResultSetTest.java | 66 ----------------------
.../scaling/distsql/util/PipelineContextUtil.java | 40 -------------
.../shardingsphere-sharding-distsql-parser/pom.xml | 2 +-
...amlPipelineProcessConfigurationSwapperTest.java | 5 +-
...rdingSpherePipelineDataSourceConfiguration.java | 15 +----
.../StandardPipelineDataSourceConfiguration.java | 2 -
.../data/pipeline/api/job/JobStatus.java | 2 +-
...gSpherePipelineDataSourceConfigurationTest.java | 4 +-
.../pipeline/core/execute/ExecuteCallback.java | 2 +-
.../data/pipeline/core/execute/ExecuteEngine.java | 10 ++--
.../PipelineContextManagerLifecycleListener.java | 1 -
.../core/prepare/InventoryTaskSplitter.java | 6 +-
.../pipeline/scenario/migration/MigrationJob.java | 4 +-
.../scenario/migration/MigrationJobAPIImpl.java | 4 +-
.../scenario/migration/MigrationJobPreparer.java | 10 ++--
.../datasource/MySQLDataSourcePreparerTest.java | 16 +++---
.../ingest/OpenGaussPositionInitializer.java | 2 +-
.../ingest/PostgreSQLPositionInitializer.java | 2 +-
.../postgresql/ingest/PostgreSQLWalDumperTest.java | 4 +-
.../rdl/rule/RuleDefinitionBackendHandler.java | 11 ----
20 files changed, 37 insertions(+), 171 deletions(-)
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowMigrationCheckAlgorithmsQueryResultSetTest.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowMigrationCheckAlgorithmsQueryResultSetTest.java
deleted file mode 100644
index 38e67a17401..00000000000
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowMigrationCheckAlgorithmsQueryResultSetTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.distsql.handler.query;
-
-import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import
org.apache.shardingsphere.migration.distsql.handler.query.ShowMigrationCheckAlgorithmsQueryResultSet;
-import
org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckAlgorithmsStatement;
-import org.apache.shardingsphere.scaling.distsql.util.PipelineContextUtil;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-@RunWith(MockitoJUnitRunner.class)
-public final class ShowMigrationCheckAlgorithmsQueryResultSetTest {
-
- @Mock
- private ShardingSphereDatabase database;
-
- @Mock
- private ShowMigrationCheckAlgorithmsStatement
showMigrationCheckAlgorithmsStatement;
-
- @BeforeClass
- public static void beforeClass() {
- PipelineContextUtil.mockModeConfig();
- }
-
- @Test
- public void assertGetRowData() {
- ShowMigrationCheckAlgorithmsQueryResultSet resultSet = new
ShowMigrationCheckAlgorithmsQueryResultSet();
- resultSet.init(database, showMigrationCheckAlgorithmsStatement);
- Collection<Object> algorithmTypes = new LinkedHashSet<>();
- while (resultSet.next()) {
- Collection<Object> actual = resultSet.getRowData();
- assertThat(actual.size(), is(3));
- Iterator<Object> rowData = actual.iterator();
- algorithmTypes.add(rowData.next());
- }
- assertTrue(algorithmTypes.contains("DATA_MATCH"));
- assertTrue(algorithmTypes.contains("CRC32_MATCH"));
- }
-}
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/util/PipelineContextUtil.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/util/PipelineContextUtil.java
deleted file mode 100644
index bd875fcd998..00000000000
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/util/PipelineContextUtil.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.distsql.util;
-
-import lombok.SneakyThrows;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
-import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
-import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
-
-import java.util.Properties;
-
-public final class PipelineContextUtil {
-
- /**
- * Mock mode configuration.
- */
- @SneakyThrows
- public static void mockModeConfig() {
- PipelineContext.initModeConfig(createModeConfig());
- }
-
- private static ModeConfiguration createModeConfig() {
- return new ModeConfiguration("Cluster", new
ClusterPersistRepositoryConfiguration("Zookeeper", "test", "localhost:2181",
new Properties()), true);
- }
-}
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/pom.xml
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/pom.xml
index 7b486e01ce9..604d3b7aadb 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/pom.xml
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/pom.xml
@@ -64,7 +64,7 @@
</configuration>
</execution>
<execution>
- <id>antlr-scaling</id>
+ <id>antlr-migration</id>
<goals>
<goal>antlr4</goal>
</goals>
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapperTest.java
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapperTest.java
index 1fc13fa0082..33e5ef8bc57 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapperTest.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapperTest.java
@@ -50,9 +50,8 @@ public final class
YamlPipelineProcessConfigurationSwapperTest {
Properties streamChannelProps = new Properties();
streamChannelProps.setProperty("block-queue-size", "10000");
yamlConfig.setStreamChannel(new YamlAlgorithmConfiguration("MEMORY",
streamChannelProps));
- YamlPipelineProcessConfigurationSwapper
onRuleAlteredActionConfigSwapper = new
YamlPipelineProcessConfigurationSwapper();
- PipelineProcessConfiguration actualConfig =
onRuleAlteredActionConfigSwapper.swapToObject(yamlConfig);
- YamlPipelineProcessConfiguration actualYamlConfig =
onRuleAlteredActionConfigSwapper.swapToYamlConfiguration(actualConfig);
+ PipelineProcessConfiguration actualConfig =
SWAPPER.swapToObject(yamlConfig);
+ YamlPipelineProcessConfiguration actualYamlConfig =
SWAPPER.swapToYamlConfiguration(actualConfig);
assertThat(YamlEngine.marshal(actualYamlConfig),
is(YamlEngine.marshal(yamlConfig)));
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
index 779a8fa3793..8b4531637cd 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
@@ -30,9 +30,9 @@ import
org.apache.shardingsphere.infra.database.metadata.url.JdbcUrlAppender;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import java.util.Arrays;
import java.util.Collection;
@@ -116,19 +116,6 @@ public final class
ShardingSpherePipelineDataSourceConfiguration implements Pipe
return TYPE;
}
- /**
- * Get actual data source configuration.
- *
- * @param actualDataSourceName actual data source name
- * @return actual data source configuration
- */
- // TODO the invocation is disabled for now, it might be used again for
next new feature
- public StandardPipelineDataSourceConfiguration
getActualDataSourceConfig(final String actualDataSourceName) {
- Map<String, Object> yamlDataSourceConfig =
rootConfig.getDataSources().get(actualDataSourceName);
- Preconditions.checkNotNull(yamlDataSourceConfig, "actualDataSourceName
'{}' does not exist", actualDataSourceName);
- return new
StandardPipelineDataSourceConfiguration(yamlDataSourceConfig);
- }
-
/**
* YAML parameter configuration.
*/
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
index 0ac8a17283e..82ddce29d2b 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
@@ -116,6 +116,4 @@ public final class StandardPipelineDataSourceConfiguration
implements PipelineDa
public Object getDataSourceConfiguration() {
return dataSourceProperties;
}
-
- // TODO toShardingSphereJDBCDataSource(final String actualDataSourceName,
final String logicTableName, final String actualTableName)
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
index a15396d52fe..3dd111050cd 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
@@ -21,7 +21,7 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
/**
- * Scaling Job status.
+ * Job status.
*/
@RequiredArgsConstructor
@Getter
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfigurationTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfigurationTest.java
index 28dc6f7df2e..64859bf92be 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfigurationTest.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfigurationTest.java
@@ -57,11 +57,11 @@ public final class
ShardingSpherePipelineDataSourceConfigurationTest {
+ " minPoolSize: 20\n"
+ " minimumIdle: 20\n"
+ " dataSourceClassName:
com.zaxxer.hikari.HikariDataSource\n"
- + " url:
jdbc:mysql://192.168.0.2:3306/scaling?serverTimezone=UTC&useSSL=false\n"
+ + " url:
jdbc:mysql://192.168.0.2:3306/ds_1?serverTimezone=UTC&useSSL=false\n"
+ " ds_0:\n"
+ " minPoolSize: 20\n"
+ " minimumIdle: 20\n"
+ " dataSourceClassName:
com.zaxxer.hikari.HikariDataSource\n"
- + " url:
jdbc:mysql://192.168.0.1:3306/scaling?serverTimezone=UTC&useSSL=false\n";
+ + " url:
jdbc:mysql://192.168.0.1:3306/ds_0?serverTimezone=UTC&useSSL=false\n";
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteCallback.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteCallback.java
index 30524b765b9..e6737d9aff3 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteCallback.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteCallback.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.execute;
/**
- * Scaling task execute callback.
+ * Task execute callback.
*/
public interface ExecuteCallback {
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteEngine.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteEngine.java
index 43d441b6873..e553bf3efa2 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteEngine.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteEngine.java
@@ -29,14 +29,14 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
- * Scaling executor engine.
+ * Executor engine.
*/
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public final class ExecuteEngine {
- private static final String SCALING_THREAD_PREFIX = "Scaling-";
+ private static final String THREAD_PREFIX = "pipeline-";
- private static final String SCALING_THREAD_SUFFIX = "-%d";
+ private static final String THREAD_SUFFIX = "-%d";
private final ExecutorService executorService;
@@ -47,7 +47,7 @@ public final class ExecuteEngine {
* @return task execute engine instance
*/
public static ExecuteEngine newCachedThreadInstance(final String
threadName) {
- String threadNameFormat = SCALING_THREAD_PREFIX + threadName +
SCALING_THREAD_SUFFIX;
+ String threadNameFormat = THREAD_PREFIX + threadName + THREAD_SUFFIX;
return new
ExecuteEngine(Executors.newCachedThreadPool(ExecutorThreadFactoryBuilder.build(threadNameFormat)));
}
@@ -59,7 +59,7 @@ public final class ExecuteEngine {
* @return task execute engine instance
*/
public static ExecuteEngine newFixedThreadInstance(final int threadNumber,
final String threadName) {
- String threadNameFormat = SCALING_THREAD_PREFIX + threadName +
SCALING_THREAD_SUFFIX;
+ String threadNameFormat = THREAD_PREFIX + threadName + THREAD_SUFFIX;
return new ExecuteEngine(Executors.newFixedThreadPool(threadNumber,
ExecutorThreadFactoryBuilder.build(threadNameFormat)));
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
index 768a0d06dac..093401d2779 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
@@ -41,7 +41,6 @@ public final class PipelineContextManagerLifecycleListener
implements ContextMan
}
PipelineContext.initModeConfig(modeConfig);
PipelineContext.initContextManager(contextManager);
- // TODO init worker only if necessary, e.g. 1) rule altered action
configured, 2) enabled job exists, 3) stopped job restarted
PipelineJobWorker.initialize();
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index f5eca19ce08..c5d10c6f5b4 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -119,10 +119,10 @@ public final class InventoryTaskSplitter {
private Collection<InventoryDumperConfiguration> splitByPrimaryKey(final
PipelineJobItemContext jobItemContext, final DataSource dataSource, final
PipelineTableMetaDataLoader metaDataLoader,
final
InventoryDumperConfiguration dumperConfig) {
Collection<InventoryDumperConfiguration> result = new LinkedList<>();
- PipelineProcessContext ruleAlteredContext =
jobItemContext.getJobProcessContext();
- PipelineReadConfiguration readConfig =
ruleAlteredContext.getPipelineProcessConfig().getRead();
+ PipelineProcessContext jobProcessContext =
jobItemContext.getJobProcessContext();
+ PipelineReadConfiguration readConfig =
jobProcessContext.getPipelineProcessConfig().getRead();
int batchSize = readConfig.getBatchSize();
- JobRateLimitAlgorithm rateLimitAlgorithm =
ruleAlteredContext.getReadRateLimitAlgorithm();
+ JobRateLimitAlgorithm rateLimitAlgorithm =
jobProcessContext.getReadRateLimitAlgorithm();
Collection<IngestPosition<?>> inventoryPositions =
getInventoryPositions(jobItemContext, dumperConfig, dataSource, metaDataLoader);
int i = 0;
for (IngestPosition<?> inventoryPosition : inventoryPositions) {
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 466b6428e96..2dd261b8420 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -121,10 +121,10 @@ public final class MigrationJob extends
AbstractPipelineJob implements SimpleJob
return;
}
log.info("stop tasks runner, jobId={}", getJobId());
- String scalingJobBarrierDisablePath =
PipelineMetaDataNode.getJobBarrierDisablePath(getJobId());
+ String jobBarrierDisablePath =
PipelineMetaDataNode.getJobBarrierDisablePath(getJobId());
for (PipelineTasksRunner each : getTasksRunnerMap().values()) {
each.stop();
-
pipelineDistributedBarrier.persistEphemeralChildrenNode(scalingJobBarrierDisablePath,
each.getJobItemContext().getShardingItem());
+
pipelineDistributedBarrier.persistEphemeralChildrenNode(jobBarrierDisablePath,
each.getJobItemContext().getShardingItem());
}
getTasksRunnerMap().clear();
PipelineJobProgressPersistService.removeJobProgressPersistContext(getJobId());
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index 176440a3540..bd898dc5d9a 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -315,7 +315,7 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
String jobId = jobConfig.getJobId();
JobRateLimitAlgorithm readRateLimitAlgorithm =
buildPipelineProcessContext(jobConfig).getReadRateLimitAlgorithm();
Map<String, DataConsistencyCheckResult> result = new
MigrationDataConsistencyChecker(jobConfig,
readRateLimitAlgorithm).check(calculator);
- log.info("Scaling job {} with check algorithm '{}' data consistency
checker result {}", jobId, calculator.getType(), result);
+ log.info("job {} with check algorithm '{}' data consistency checker
result {}", jobId, calculator.getType(), result);
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobCheckResult(jobId,
aggregateDataConsistencyCheckResults(jobId, result));
return result;
}
@@ -330,7 +330,7 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
boolean isCountMatched =
checkResult.getCountCheckResult().isMatched();
boolean isContentMatched =
checkResult.getContentCheckResult().isMatched();
if (!isCountMatched || !isContentMatched) {
- log.error("Scaling job: {}, table: {} data consistency check
failed, count matched: {}, content matched: {}", jobId, entry.getKey(),
isCountMatched, isContentMatched);
+ log.error("job: {}, table: {} data consistency check failed,
count matched: {}, content matched: {}", jobId, entry.getKey(), isCountMatched,
isContentMatched);
return false;
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
index b11f3deb02a..c3cd181a04d 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
@@ -64,7 +64,7 @@ public final class MigrationJobPreparer {
private static final MigrationJobAPI JOB_API =
MigrationJobAPIFactory.getInstance();
/**
- * Do prepare work for scaling job.
+ * Do prepare work.
*
* @param jobItemContext job item context
* @throws SQLException SQL exception
@@ -90,8 +90,8 @@ public final class MigrationJobPreparer {
log.info("prepare, jobId={}, shardingItem={}, inventoryTasks={},
incrementalTasks={}",
jobItemContext.getJobId(),
jobItemContext.getShardingItem(), jobItemContext.getInventoryTasks(),
jobItemContext.getIncrementalTasks());
} catch (final SQLException ex) {
- log.error("Scaling job preparing failed, jobId={}",
jobItemContext.getJobId());
- throw new PipelineJobPrepareFailedException("Scaling job preparing
failed, jobId=" + jobItemContext.getJobId(), ex);
+ log.error("job preparing failed, jobId={}",
jobItemContext.getJobId());
+ throw new PipelineJobPrepareFailedException("job preparing failed,
jobId=" + jobItemContext.getJobId(), ex);
}
}
@@ -187,7 +187,7 @@ public final class MigrationJobPreparer {
}
/**
- * Do cleanup work for scaling job.
+ * Do cleanup work.
*
* @param jobConfig job configuration
*/
@@ -195,7 +195,7 @@ public final class MigrationJobPreparer {
try {
PipelineJobPreparerUtils.destroyPosition(jobConfig.getJobId(),
jobConfig.getSource());
} catch (final SQLException ex) {
- log.warn("Scaling job destroying failed", ex);
+ log.warn("job destroying failed", ex);
}
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
index 5686c12076b..0112176b321 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
@@ -60,10 +60,10 @@ public final class MySQLDataSourcePreparerTest {
private PipelineDataSourceConfiguration targetPipelineDataSourceConfig;
@Mock
- private ShardingSpherePipelineDataSourceConfiguration
sourceScalingDataSourceConfig;
+ private ShardingSpherePipelineDataSourceConfiguration
sourceDataSourceConfig;
@Mock
- private ShardingSpherePipelineDataSourceConfiguration
targetScalingDataSourceConfig;
+ private ShardingSpherePipelineDataSourceConfiguration
targetDataSourceConfig;
@Mock
private PipelineDataSourceWrapper sourceDataSourceWrapper;
@@ -74,8 +74,8 @@ public final class MySQLDataSourcePreparerTest {
@Before
public void setUp() throws SQLException {
PipelineDataSourceManager mockPipelineDataSourceManager =
mock(PipelineDataSourceManager.class);
-
when(mockPipelineDataSourceManager.getDataSource(same(sourceScalingDataSourceConfig))).thenReturn(sourceDataSourceWrapper);
-
when(mockPipelineDataSourceManager.getDataSource(same(targetScalingDataSourceConfig))).thenReturn(targetDataSourceWrapper);
+
when(mockPipelineDataSourceManager.getDataSource(same(sourceDataSourceConfig))).thenReturn(sourceDataSourceWrapper);
+
when(mockPipelineDataSourceManager.getDataSource(same(targetDataSourceConfig))).thenReturn(targetDataSourceWrapper);
when(prepareTargetTablesParameter.getDataSourceManager()).thenReturn(mockPipelineDataSourceManager);
when(jobConfig.getSource()).thenReturn(sourcePipelineDataSourceConfig);
when(jobConfig.getSource().getType()).thenReturn("ShardingSphereJDBC");
@@ -90,9 +90,9 @@ public final class MySQLDataSourcePreparerTest {
public void assertGetConnection() throws SQLException {
try (MockedStatic<PipelineDataSourceConfigurationFactory>
mockedStaticPipelineDataSourceConfigurationFactory =
mockStatic(PipelineDataSourceConfigurationFactory.class)) {
mockedStaticPipelineDataSourceConfigurationFactory.when(() ->
PipelineDataSourceConfigurationFactory.newInstance(eq("ShardingSphereJDBC"),
eq("source")))
- .thenReturn(sourceScalingDataSourceConfig);
+ .thenReturn(sourceDataSourceConfig);
mockedStaticPipelineDataSourceConfigurationFactory.when(() ->
PipelineDataSourceConfigurationFactory.newInstance(eq("ShardingSphereJDBC"),
eq("target")))
- .thenReturn(targetScalingDataSourceConfig);
+ .thenReturn(targetDataSourceConfig);
new
MySQLDataSourcePreparer().prepareTargetTables(prepareTargetTablesParameter);
verify(sourceDataSourceWrapper).getConnection();
verify(targetDataSourceWrapper).getConnection();
@@ -103,9 +103,9 @@ public final class MySQLDataSourcePreparerTest {
public void assertThrowPrepareFailedException() throws SQLException {
try (MockedStatic<PipelineDataSourceConfigurationFactory>
mockedStaticPipelineDataSourceConfigurationFactory =
mockStatic(PipelineDataSourceConfigurationFactory.class)) {
mockedStaticPipelineDataSourceConfigurationFactory.when(() ->
PipelineDataSourceConfigurationFactory.newInstance(eq("ShardingSphereJDBC"),
eq("source")))
- .thenReturn(sourceScalingDataSourceConfig);
+ .thenReturn(sourceDataSourceConfig);
mockedStaticPipelineDataSourceConfigurationFactory.when(() ->
PipelineDataSourceConfigurationFactory.newInstance(eq("ShardingSphereJDBC"),
eq("target")))
- .thenReturn(targetScalingDataSourceConfig);
+ .thenReturn(targetDataSourceConfig);
when(sourceDataSourceWrapper.getConnection()).thenThrow(SQLException.class);
new
MySQLDataSourcePreparer().prepareTargetTables(prepareTargetTablesParameter);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
index 49ec98c05d4..1b2542df1d5 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
@@ -38,7 +38,7 @@ import java.sql.SQLException;
@Slf4j
public final class OpenGaussPositionInitializer implements PositionInitializer
{
- private static final String SLOT_NAME_PREFIX = "sharding_scaling";
+ private static final String SLOT_NAME_PREFIX = "pipeline";
private static final String DECODE_PLUGIN = "mppdb_decoding";
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
index 02855ad13bf..2eed41e36a5 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
@@ -36,7 +36,7 @@ import java.sql.SQLException;
@Slf4j
public final class PostgreSQLPositionInitializer implements
PositionInitializer {
- private static final String SLOT_NAME_PREFIX = "sharding_scaling";
+ private static final String SLOT_NAME_PREFIX = "pipeline";
private static final String DECODE_PLUGIN = "test_decoding";
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
index 7425970029a..7e686c000ec 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
@@ -107,7 +107,7 @@ public final class PostgreSQLWalDumperTest {
}
PipelineDataSourceConfiguration dataSourceConfig = new
StandardPipelineDataSourceConfiguration(jdbcUrl, username, password);
DumperConfiguration result = new DumperConfiguration();
- result.setJobId("0101123455F45SCALING8898");
+ result.setJobId("0101123456");
result.setDataSourceConfig(dataSourceConfig);
result.setTableNameMap(Collections.singletonMap(new
ActualTableName("t_order_0"), new LogicTableName("t_order")));
result.setTableNameSchemaNameMapping(new
TableNameSchemaNameMapping(Collections.emptyMap()));
@@ -122,7 +122,7 @@ public final class PostgreSQLWalDumperTest {
when(logicalReplication.createConnection(dataSourceConfig)).thenReturn(pgConnection);
when(pgConnection.unwrap(PgConnection.class)).thenReturn(pgConnection);
try (MockedStatic<PostgreSQLPositionInitializer>
positionInitializer = mockStatic(PostgreSQLPositionInitializer.class)) {
- positionInitializer.when(() ->
PostgreSQLPositionInitializer.getUniqueSlotName(eq(pgConnection),
anyString())).thenReturn("0101123455F45SCALING8898");
+ positionInitializer.when(() ->
PostgreSQLPositionInitializer.getUniqueSlotName(eq(pgConnection),
anyString())).thenReturn("0101123456");
when(logicalReplication.createReplicationStream(pgConnection,
PostgreSQLPositionInitializer.getUniqueSlotName(pgConnection, ""),
position.getLogSequenceNumber()))
.thenReturn(pgReplicationStream);
ByteBuffer data = ByteBuffer.wrap("table public.t_order_0:
DELETE: order_id[integer]:1".getBytes());
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/rule/RuleDefinitionBackendHandler.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/rule/RuleDefinitionBackendHandler.java
index 520f916c5c0..8f1f3e7dbd8 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/rule/RuleDefinitionBackendHandler.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/rule/RuleDefinitionBackendHandler.java
@@ -33,19 +33,11 @@ import
org.apache.shardingsphere.proxy.backend.handler.DatabaseRequiredBackendHa
import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
import
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
-import
org.apache.shardingsphere.sharding.distsql.parser.statement.AlterDefaultShardingStrategyStatement;
-import
org.apache.shardingsphere.sharding.distsql.parser.statement.AlterShardingAlgorithmStatement;
-import
org.apache.shardingsphere.sharding.distsql.parser.statement.AlterShardingBindingTableRulesStatement;
-import
org.apache.shardingsphere.sharding.distsql.parser.statement.AlterShardingBroadcastTableRulesStatement;
-import
org.apache.shardingsphere.sharding.distsql.parser.statement.AlterShardingTableRuleStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
-import java.util.Arrays;
import java.util.Collection;
-import java.util.HashSet;
import java.util.LinkedList;
import java.util.Optional;
-import java.util.Set;
/**
* Rule definition backend handler.
@@ -55,9 +47,6 @@ import java.util.Set;
@Slf4j
public final class RuleDefinitionBackendHandler<T extends
RuleDefinitionStatement> extends DatabaseRequiredBackendHandler<T> {
- private static final Set<String> RULE_ALTERED_ACTIONS = new
HashSet<>(Arrays.asList(AlterShardingTableRuleStatement.class.getName(),
AlterShardingAlgorithmStatement.class.getName(),
- AlterDefaultShardingStrategyStatement.class.getName(),
AlterShardingBindingTableRulesStatement.class.getName(),
AlterShardingBroadcastTableRulesStatement.class.getName()));
-
public RuleDefinitionBackendHandler(final T sqlStatement, final
ConnectionSession connectionSession) {
super(sqlStatement, connectionSession);
}