This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 4bc8afaa008 Add AbstractPipelineJob for common usage (#19969)
4bc8afaa008 is described below
commit 4bc8afaa008e55f153f1655435f05b622b1e2689
Author: Xinze Guo <[email protected]>
AuthorDate: Mon Aug 8 18:40:03 2022 +0800
Add AbstractPipelineJob for common usage (#19969)
---
.../scenario/rulealtered/AbstractPipelineJob.java | 57 +++++++++++++++
.../scenario/rulealtered/RuleAlteredJob.java | 83 +++++++++++-----------
.../rulealtered/RuleAlteredJobContext.java | 5 +-
.../rulealtered/RuleAlteredJobScheduler.java | 32 +--------
.../api/impl/GovernanceRepositoryAPIImplTest.java | 3 +-
.../core/api/impl/RuleAlteredJobAPIImplTest.java | 5 +-
.../consistency/DataConsistencyCheckerTest.java | 3 +-
.../pipeline/core/task/IncrementalTaskTest.java | 3 +-
.../data/pipeline/core/task/InventoryTaskTest.java | 3 +-
.../rulealtered/RuleAlteredJobWorkerTest.java | 5 +-
.../prepare/InventoryTaskSplitterTest.java | 3 +-
11 files changed, 113 insertions(+), 89 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/AbstractPipelineJob.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/AbstractPipelineJob.java
new file mode 100644
index 00000000000..f07b2da52f9
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/AbstractPipelineJob.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
+import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
+import
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Abstract pipeline job.
+ */
+@Slf4j
+@Getter
+public abstract class AbstractPipelineJob implements PipelineJob {
+
+ @Setter
+ private volatile String jobId;
+
+ @Setter
+ private volatile boolean stopping;
+
+ @Setter
+ private OneOffJobBootstrap oneOffJobBootstrap;
+
+ private final Map<Integer, PipelineTasksRunner> tasksRunnerMap = new
ConcurrentHashMap<>();
+
+ protected void runInBackground(final Runnable runnable) {
+ new Thread(runnable).start();
+ }
+
+ @Override
+ public Optional<PipelineTasksRunner> getTasksRunner(final int
shardingItem) {
+ return Optional.ofNullable(tasksRunnerMap.get(shardingItem));
+ }
+}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
index e0ee494d5bf..ebd74c85f1a 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
@@ -17,98 +17,99 @@
package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
-import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
+import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineIgnoredException;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
-import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-
/**
* Rule altered job.
*/
@Slf4j
@RequiredArgsConstructor
-public final class RuleAlteredJob implements SimpleJob, PipelineJob {
+public final class RuleAlteredJob extends AbstractPipelineJob implements
SimpleJob, PipelineJob {
private final GovernanceRepositoryAPI governanceRepositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI();
- private volatile String jobId;
-
private final PipelineDataSourceManager dataSourceManager = new
PipelineDataSourceManager();
// Shared by all sharding items
private final RuleAlteredJobPreparer jobPreparer = new
RuleAlteredJobPreparer();
- @Getter
- private final Map<Integer, PipelineTasksRunner> tasksRunnerMap = new
ConcurrentHashMap<>();
-
- private volatile boolean stopping;
-
- @Setter
- private OneOffJobBootstrap oneOffJobBootstrap;
-
@Override
public void execute(final ShardingContext shardingContext) {
log.info("Execute job {}-{}", shardingContext.getJobName(),
shardingContext.getShardingItem());
- if (stopping) {
+ if (isStopping()) {
log.info("stopping true, ignore");
return;
}
- jobId = shardingContext.getJobName();
+ setJobId(shardingContext.getJobName());
RuleAlteredJobConfiguration jobConfig =
RuleAlteredJobConfigurationSwapper.swapToObject(shardingContext.getJobParameter());
JobProgress initProgress =
governanceRepositoryAPI.getJobProgress(shardingContext.getJobName(),
shardingContext.getShardingItem());
- RuleAlteredJobContext jobContext = new
RuleAlteredJobContext(jobConfig, shardingContext.getShardingItem(),
initProgress, dataSourceManager, jobPreparer);
+ RuleAlteredJobContext jobContext = new
RuleAlteredJobContext(jobConfig, shardingContext.getShardingItem(),
initProgress, dataSourceManager);
int shardingItem = jobContext.getShardingItem();
- if (tasksRunnerMap.containsKey(shardingItem)) {
- // If the following log is output, it is possible that the
elasticjob task was not closed correctly
+ if (getTasksRunnerMap().containsKey(shardingItem)) {
+ // If the following log is output, it is possible that the
elasticjob task was not shutdown correctly
log.warn("schedulerMap contains shardingItem {}, ignore",
shardingItem);
return;
}
- log.info("start RuleAlteredJobScheduler, jobId={}, shardingItem={}",
jobId, shardingItem);
+ log.info("start RuleAlteredJobScheduler, jobId={}, shardingItem={}",
getJobId(), shardingItem);
RuleAlteredJobScheduler jobScheduler = new
RuleAlteredJobScheduler(jobContext);
- jobScheduler.start();
- tasksRunnerMap.put(shardingItem, jobScheduler);
- PipelineJobProgressPersistService.addJobProgressPersistContext(jobId,
shardingItem);
- }
-
- @Override
- public Optional<PipelineTasksRunner> getTasksRunner(final int
shardingItem) {
- return Optional.ofNullable(tasksRunnerMap.get(shardingItem));
+ runInBackground(() -> {
+ prepare(jobContext);
+ jobScheduler.start();
+ });
+ getTasksRunnerMap().put(shardingItem, jobScheduler);
+
PipelineJobProgressPersistService.addJobProgressPersistContext(getJobId(),
shardingItem);
}
/**
* Stop job.
*/
public void stop() {
- stopping = true;
+ setStopping(true);
dataSourceManager.close();
- if (null != oneOffJobBootstrap) {
- oneOffJobBootstrap.shutdown();
+ if (null != getOneOffJobBootstrap()) {
+ getOneOffJobBootstrap().shutdown();
}
- if (null == jobId) {
+ if (null == getJobId()) {
log.info("stop, jobId is null, ignore");
return;
}
- log.info("stop job scheduler, jobId={}", jobId);
- for (PipelineTasksRunner each : tasksRunnerMap.values()) {
+ log.info("stop job scheduler, jobId={}", getJobId());
+ for (PipelineTasksRunner each : getTasksRunnerMap().values()) {
each.stop();
}
- tasksRunnerMap.clear();
-
PipelineJobProgressPersistService.removeJobProgressPersistContext(jobId);
+ getTasksRunnerMap().clear();
+
PipelineJobProgressPersistService.removeJobProgressPersistContext(getJobId());
+ }
+
+ private void prepare(final RuleAlteredJobContext jobContext) {
+ try {
+ jobPreparer.prepare(jobContext);
+ } catch (final PipelineIgnoredException ex) {
+ log.info("pipeline ignore exception: {}", ex.getMessage());
+ PipelineJobCenter.stop(getJobId());
+ // CHECKSTYLE:OFF
+ } catch (final RuntimeException ex) {
+ // CHECKSTYLE:ON
+ log.error("job prepare failed, {}-{}", getJobId(),
jobContext.getShardingItem(), ex);
+ PipelineJobCenter.stop(getJobId());
+ jobContext.setStatus(JobStatus.PREPARING_FAILURE);
+
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobProgress(jobContext);
+ throw ex;
+ }
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
index 8ffcba5ec20..ef511163d33 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
@@ -83,17 +83,14 @@ public final class RuleAlteredJobContext implements
PipelineJobContext {
}
};
- private final RuleAlteredJobPreparer jobPreparer;
-
public RuleAlteredJobContext(final RuleAlteredJobConfiguration jobConfig,
final int jobShardingItem, final JobProgress initProgress,
- final PipelineDataSourceManager
dataSourceManager, final RuleAlteredJobPreparer jobPreparer) {
+ final PipelineDataSourceManager
dataSourceManager) {
jobProcessContext =
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
this.jobConfig = jobConfig;
jobId = jobConfig.getJobId();
this.shardingItem = jobShardingItem;
this.initProgress = initProgress;
this.dataSourceManager = dataSourceManager;
- this.jobPreparer = jobPreparer;
taskConfig = RuleAlteredJobWorker.buildTaskConfig(jobConfig,
jobShardingItem, jobProcessContext.getPipelineProcessConfig());
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
index ac0618c3643..45f99dc3e21 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
@@ -23,11 +23,8 @@ import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
-import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineIgnoredException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
@@ -38,17 +35,10 @@ import
org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
@Slf4j
@RequiredArgsConstructor
@Getter
-public final class RuleAlteredJobScheduler implements PipelineTasksRunner,
Runnable {
+public final class RuleAlteredJobScheduler implements PipelineTasksRunner {
private final RuleAlteredJobContext jobContext;
- /**
- * Start execute job.
- */
- public void start() {
- new Thread(this).start();
- }
-
/**
* Stop all task.
*/
@@ -69,28 +59,12 @@ public final class RuleAlteredJobScheduler implements
PipelineTasksRunner, Runna
}
@Override
- public void run() {
- String jobId = jobContext.getJobId();
- GovernanceRepositoryAPI governanceRepositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI();
- try {
- jobContext.getJobPreparer().prepare(jobContext);
- } catch (final PipelineIgnoredException ex) {
- log.info("pipeline ignore exception: {}", ex.getMessage());
- PipelineJobCenter.stop(jobId);
- // CHECKSTYLE:OFF
- } catch (final RuntimeException ex) {
- // CHECKSTYLE:ON
- log.error("job prepare failed, {}-{}", jobId,
jobContext.getShardingItem(), ex);
- PipelineJobCenter.stop(jobId);
- jobContext.setStatus(JobStatus.PREPARING_FAILURE);
- governanceRepositoryAPI.persistJobProgress(jobContext);
- throw ex;
- }
+ public void start() {
if (jobContext.isStopping()) {
log.info("job stopping, ignore inventory task");
return;
}
- governanceRepositoryAPI.persistJobProgress(jobContext);
+
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobProgress(jobContext);
if (executeInventoryTask()) {
if (jobContext.isStopping()) {
log.info("stopping, ignore incremental task");
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index 8457c4e1ff5..c6864fe9f61 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -37,7 +37,6 @@ import
org.apache.shardingsphere.data.pipeline.core.util.ConfigurationFileUtil;
import
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
-import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobPreparer;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
@@ -142,7 +141,7 @@ public final class GovernanceRepositoryAPIImplTest {
}
private RuleAlteredJobContext mockJobContext() {
- RuleAlteredJobContext result = new
RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration(), 0, new
JobProgress(), new PipelineDataSourceManager(), new RuleAlteredJobPreparer());
+ RuleAlteredJobContext result = new
RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration(), 0, new
JobProgress(), new PipelineDataSourceManager());
TaskConfiguration taskConfig = result.getTaskConfig();
result.getInventoryTasks().add(mockInventoryTask(taskConfig));
result.getIncrementalTasks().add(mockIncrementalTask(taskConfig));
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
index a525658e714..6c72641a38d 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
@@ -38,7 +38,6 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFail
import
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
-import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobPreparer;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -200,7 +199,7 @@ public final class RuleAlteredJobAPIImplTest {
Optional<String> jobId = ruleAlteredJobAPI.start(jobConfig);
assertTrue(jobId.isPresent());
final GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI();
- RuleAlteredJobContext jobContext = new
RuleAlteredJobContext(jobConfig, 0, new JobProgress(), new
PipelineDataSourceManager(), new RuleAlteredJobPreparer());
+ RuleAlteredJobContext jobContext = new
RuleAlteredJobContext(jobConfig, 0, new JobProgress(), new
PipelineDataSourceManager());
repositoryAPI.persistJobProgress(jobContext);
repositoryAPI.persistJobCheckResult(jobId.get(), true);
repositoryAPI.updateShardingJobStatus(jobId.get(), 0,
JobStatus.FINISHED);
@@ -213,7 +212,7 @@ public final class RuleAlteredJobAPIImplTest {
Optional<String> jobId = ruleAlteredJobAPI.start(jobConfig);
assertTrue(jobId.isPresent());
GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI();
- RuleAlteredJobContext jobContext = new
RuleAlteredJobContext(jobConfig, 0, new JobProgress(), new
PipelineDataSourceManager(), new RuleAlteredJobPreparer());
+ RuleAlteredJobContext jobContext = new
RuleAlteredJobContext(jobConfig, 0, new JobProgress(), new
PipelineDataSourceManager());
repositoryAPI.persistJobProgress(jobContext);
repositoryAPI.persistJobCheckResult(jobId.get(), true);
repositoryAPI.updateShardingJobStatus(jobId.get(),
jobContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK);
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
index e9f18715051..e9c6e1a0b06 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
@@ -26,7 +26,6 @@ import
org.apache.shardingsphere.data.pipeline.core.fixture.DataConsistencyCalcu
import
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
-import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobPreparer;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -56,7 +55,7 @@ public final class DataConsistencyCheckerTest {
private RuleAlteredJobConfiguration createJobConfiguration() throws
SQLException {
RuleAlteredJobContext jobContext = new
RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration(), 0,
- new JobProgress(), new PipelineDataSourceManager(), new
RuleAlteredJobPreparer());
+ new JobProgress(), new PipelineDataSourceManager());
initTableData(jobContext.getTaskConfig().getDumperConfig().getDataSourceConfig());
initTableData(jobContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
return jobContext.getJobConfig();
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
index e81a8913b09..509caa6e02b 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
@@ -27,7 +27,6 @@ import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTabl
import
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
-import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobPreparer;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -50,7 +49,7 @@ public final class IncrementalTaskTest {
@Before
public void setUp() {
TaskConfiguration taskConfig = new
RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration(), 0, new
JobProgress(),
- new PipelineDataSourceManager(), new
RuleAlteredJobPreparer()).getTaskConfig();
+ new PipelineDataSourceManager()).getTaskConfig();
taskConfig.getDumperConfig().setPosition(new PlaceholderPosition());
PipelineTableMetaDataLoader metaDataLoader = new
PipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
incrementalTask = new IncrementalTask(3, taskConfig.getDumperConfig(),
taskConfig.getImporterConfig(),
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index afde39ffba6..f31cba885c0 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -30,7 +30,6 @@ import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTabl
import
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
-import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobPreparer;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -62,7 +61,7 @@ public final class InventoryTaskTest {
@Before
public void setUp() {
- taskConfig = new
RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration(), 0, new
JobProgress(), new PipelineDataSourceManager(), new
RuleAlteredJobPreparer()).getTaskConfig();
+ taskConfig = new
RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration(), 0, new
JobProgress(), new PipelineDataSourceManager()).getTaskConfig();
}
@Test(expected = IngestException.class)
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
index d51586c71d9..32efa4f06fc 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
@@ -43,6 +43,7 @@ import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URL;
+import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Optional;
@@ -96,13 +97,13 @@ public final class RuleAlteredJobWorkerTest {
// @Test
public void assertHasUncompletedJob() throws InvocationTargetException,
NoSuchMethodException, IllegalAccessException, IOException {
final RuleAlteredJobConfiguration jobConfig =
JobConfigurationBuilder.createJobConfiguration();
- RuleAlteredJobContext jobContext = new
RuleAlteredJobContext(jobConfig, 0, new JobProgress(), new
PipelineDataSourceManager(), new RuleAlteredJobPreparer());
+ RuleAlteredJobContext jobContext = new
RuleAlteredJobContext(jobConfig, 0, new JobProgress(), new
PipelineDataSourceManager());
jobContext.setStatus(JobStatus.PREPARING);
GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI();
repositoryAPI.persistJobProgress(jobContext);
URL jobConfigUrl =
getClass().getClassLoader().getResource("scaling/rule_alter/scaling_job_config.yaml");
assertNotNull(jobConfigUrl);
-
repositoryAPI.persist(PipelineMetaDataNode.getJobConfigPath(jobContext.getJobId()),
FileUtils.readFileToString(new File(jobConfigUrl.getFile())));
+
repositoryAPI.persist(PipelineMetaDataNode.getJobConfigPath(jobContext.getJobId()),
FileUtils.readFileToString(new File(jobConfigUrl.getFile()),
StandardCharsets.UTF_8));
Object result = ReflectionUtil.invokeMethod(new
RuleAlteredJobWorker(), "hasUncompletedJobOfSameDatabaseName", new
Class[]{String.class},
new String[]{jobConfig.getDatabaseName()});
assertFalse((Boolean) result);
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
index ee0ea66f257..99fbc9b6d30 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
@@ -29,7 +29,6 @@ import
org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
-import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobPreparer;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -68,7 +67,7 @@ public final class InventoryTaskSplitterTest {
private void initJobContext() {
RuleAlteredJobConfiguration jobConfig =
JobConfigurationBuilder.createJobConfiguration();
JobProgress initProgress =
PipelineAPIFactory.getGovernanceRepositoryAPI().getJobProgress(jobConfig.getJobId(),
0);
- jobContext = new RuleAlteredJobContext(jobConfig, 0, initProgress, new
PipelineDataSourceManager(), new RuleAlteredJobPreparer());
+ jobContext = new RuleAlteredJobContext(jobConfig, 0, initProgress, new
PipelineDataSourceManager());
dataSourceManager = jobContext.getDataSourceManager();
taskConfig = jobContext.getTaskConfig();
}