This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 1928e739ba2 Refactor PipelineJobAPI.getJobConfiguration() (#29059)
1928e739ba2 is described below
commit 1928e739ba2328f16356c74c23e1196002675bae
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Nov 16 23:44:30 2023 +0800
Refactor PipelineJobAPI.getJobConfiguration() (#29059)
---
.../data/pipeline/core/job/service/PipelineJobAPI.java | 8 --------
.../distsql/handler/update/CheckMigrationJobUpdater.java | 3 ++-
.../shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java | 5 -----
.../data/pipeline/cdc/handler/CDCBackendHandler.java | 5 +++--
.../consistencycheck/api/impl/ConsistencyCheckJobAPI.java | 7 +------
.../consistencycheck/task/ConsistencyCheckTasksRunner.java | 2 +-
.../pipeline/scenario/migration/api/impl/MigrationJobAPI.java | 9 ++-------
.../consistencycheck/api/impl/ConsistencyCheckJobAPITest.java | 2 +-
.../scenario/migration/api/impl/MigrationJobAPITest.java | 6 +++---
9 files changed, 13 insertions(+), 34 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
index b823bba76bf..4ac59eefcf9 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
@@ -34,14 +34,6 @@ import java.util.Optional;
@SingletonSPI
public interface PipelineJobAPI extends TypedSPI {
- /**
- * Get job configuration.
- *
- * @param jobId job id
- * @return job configuration
- */
- PipelineJobConfiguration getJobConfiguration(String jobId);
-
/**
* Get job configuration.
*
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
index a875439f4da..547d6255f4b 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.migration.distsql.handler.update;
import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.pojo.CreateConsistencyCheckJobParameter;
@@ -46,7 +47,7 @@ public final class CheckMigrationJobUpdater implements
RALUpdater<CheckMigration
String algorithmTypeName = null == typeStrategy ? null :
typeStrategy.getName();
Properties algorithmProps = null == typeStrategy ? null :
typeStrategy.getProps();
String jobId = sqlStatement.getJobId();
- MigrationJobConfiguration jobConfig =
migrationJobAPI.getJobConfiguration(jobId);
+ MigrationJobConfiguration jobConfig =
migrationJobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
verifyInventoryFinished(jobConfig);
checkJobAPI.createJobAndStart(new
CreateConsistencyCheckJobParameter(jobId, algorithmTypeName, algorithmProps,
jobConfig.getSourceDatabaseType(), jobConfig.getTargetDatabaseType()));
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 1993cde6e7b..4d6d31a3cfb 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -278,11 +278,6 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
return new CDCProcessContext(pipelineJobConfig.getJobId(),
showProcessConfiguration(PipelineJobIdUtils.parseContextKey(pipelineJobConfig.getJobId())));
}
- @Override
- public CDCJobConfiguration getJobConfiguration(final String jobId) {
- return
getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
- }
-
@Override
public CDCJobConfiguration getJobConfiguration(final JobConfigurationPOJO
jobConfigPOJO) {
return new
YamlCDCJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index ad7f7b77410..2ef78db661e 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -47,6 +47,7 @@ import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextMan
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import
org.apache.shardingsphere.infra.database.opengauss.type.OpenGaussDatabaseType;
@@ -78,7 +79,7 @@ public final class CDCBackendHandler {
* @return database
*/
public String getDatabaseNameByJobId(final String jobId) {
- return jobAPI.getJobConfiguration(jobId).getDatabaseName();
+ return
jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)).getDatabaseName();
}
/**
@@ -126,7 +127,7 @@ public final class CDCBackendHandler {
* @param connectionContext connection context
*/
public void startStreaming(final String jobId, final CDCConnectionContext
connectionContext, final Channel channel) {
- CDCJobConfiguration cdcJobConfig = jobAPI.getJobConfiguration(jobId);
+ CDCJobConfiguration cdcJobConfig =
jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
ShardingSpherePreconditions.checkNotNull(cdcJobConfig, () -> new
PipelineJobNotFoundException(jobId));
if (PipelineJobCenter.isJobExisting(jobId)) {
PipelineJobCenter.stop(jobId);
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index ac99a1917c8..88e39ee70d3 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -312,7 +312,7 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
}
private void fillInJobItemInfoWithCheckAlgorithm(final
ConsistencyCheckJobItemInfo result, final String checkJobId) {
- ConsistencyCheckJobConfiguration jobConfig =
getJobConfiguration(checkJobId);
+ ConsistencyCheckJobConfiguration jobConfig =
getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId));
result.setAlgorithmType(jobConfig.getAlgorithmTypeName());
if (null != jobConfig.getAlgorithmProps()) {
result.setAlgorithmProps(jobConfig.getAlgorithmProps().entrySet().stream().map(entry
-> String.format("'%s'='%s'", entry.getKey(),
entry.getValue())).collect(Collectors.joining(",")));
@@ -329,11 +329,6 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
}
}
- @Override
- public ConsistencyCheckJobConfiguration getJobConfiguration(final String
jobId) {
- return
getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
- }
-
@Override
public ConsistencyCheckJobConfiguration getJobConfiguration(final
JobConfigurationPOJO jobConfigPOJO) {
return new
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 3ae18d58129..9c8b1f0f949 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -98,7 +98,7 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
checkJobAPI.persistJobItemProgress(jobItemContext);
JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId);
InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI)
TypedSPILoader.getService(PipelineJobAPI.class, jobType.getType());
- PipelineJobConfiguration parentJobConfig =
jobAPI.getJobConfiguration(parentJobId);
+ PipelineJobConfiguration parentJobConfig =
jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(parentJobId));
try {
PipelineDataConsistencyChecker checker =
jobAPI.buildPipelineDataConsistencyChecker(
parentJobConfig,
jobAPI.buildPipelineProcessContext(parentJobConfig),
jobItemContext.getProgressContext());
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 1fe00789a82..52527c61901 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -226,11 +226,6 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
}
}
- @Override
- public MigrationJobConfiguration getJobConfiguration(final String jobId) {
- return
getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
- }
-
@Override
public MigrationJobConfiguration getJobConfiguration(final
JobConfigurationPOJO jobConfigPOJO) {
return new
YamlMigrationJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
@@ -328,7 +323,7 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
}
private void cleanTempTableOnRollback(final String jobId) throws
SQLException {
- MigrationJobConfiguration jobConfig = getJobConfiguration(jobId);
+ MigrationJobConfiguration jobConfig =
getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
PipelineCommonSQLBuilder pipelineSQLBuilder = new
PipelineCommonSQLBuilder(jobConfig.getTargetDatabaseType());
TableAndSchemaNameMapper mapping = new
TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
try (
@@ -352,7 +347,7 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
PipelineJobManager jobManager = new PipelineJobManager(this);
jobManager.stop(jobId);
dropCheckJobs(jobId);
- MigrationJobConfiguration jobConfig = getJobConfiguration(jobId);
+ MigrationJobConfiguration jobConfig =
getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
refreshTableMetadata(jobId, jobConfig.getTargetDatabaseName());
jobManager.drop(jobId);
log.info("Commit cost {} ms", System.currentTimeMillis() -
startTimeMillis);
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
index 6e90b0b2224..169f76ab6d5 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
@@ -64,7 +64,7 @@ class ConsistencyCheckJobAPITest {
String parentJobId = parentJobConfig.getJobId();
String checkJobId = checkJobAPI.createJobAndStart(new
CreateConsistencyCheckJobParameter(parentJobId, null, null,
parentJobConfig.getSourceDatabaseType(),
parentJobConfig.getTargetDatabaseType()));
- ConsistencyCheckJobConfiguration checkJobConfig =
checkJobAPI.getJobConfiguration(checkJobId);
+ ConsistencyCheckJobConfiguration checkJobConfig =
checkJobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId));
int expectedSequence = ConsistencyCheckSequence.MIN_SEQUENCE;
String expectCheckJobId = new
ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId),
parentJobId, expectedSequence).marshal();
assertThat(checkJobConfig.getJobId(), is(expectCheckJobId));
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index e206c202875..bbd98ed6fbd 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -142,7 +142,7 @@ class MigrationJobAPITest {
void assertRollback() throws SQLException {
Optional<String> jobId =
jobManager.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
- MigrationJobConfiguration jobConfig =
jobAPI.getJobConfiguration(jobId.get());
+ MigrationJobConfiguration jobConfig =
jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId.get()));
initTableData(jobConfig);
PipelineDistributedBarrier mockBarrier =
mock(PipelineDistributedBarrier.class);
when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier);
@@ -154,7 +154,7 @@ class MigrationJobAPITest {
void assertCommit() {
Optional<String> jobId =
jobManager.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
- MigrationJobConfiguration jobConfig =
jobAPI.getJobConfiguration(jobId.get());
+ MigrationJobConfiguration jobConfig =
jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId.get()));
initTableData(jobConfig);
PipelineDistributedBarrier mockBarrier =
mock(PipelineDistributedBarrier.class);
when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier);
@@ -277,7 +277,7 @@ class MigrationJobAPITest {
initIntPrimaryEnvironment();
SourceTargetEntry sourceTargetEntry = new
SourceTargetEntry("logic_db", new DataNode("ds_0", "t_order"), "t_order");
String jobId =
jobAPI.createJobAndStart(PipelineContextUtils.getContextKey(), new
MigrateTableStatement(Collections.singletonList(sourceTargetEntry),
"logic_db"));
- MigrationJobConfiguration actual = jobAPI.getJobConfiguration(jobId);
+ MigrationJobConfiguration actual =
jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
assertThat(actual.getTargetDatabaseName(), is("logic_db"));
List<JobDataNodeLine> dataNodeLines = actual.getJobShardingDataNodes();
assertThat(dataNodeLines.size(), is(1));