This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new dc00a1b566e Add PipelineJobPreparer (#32758)
dc00a1b566e is described below
commit dc00a1b566e2ebe4f9e321445105a46b6128cbb3
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Sep 1 13:51:10 2024 +0800
Add PipelineJobPreparer (#32758)
* Add PipelineJobPreparer
* Add PipelineJobPreparer
---
.../core/job/preparer/PipelineJobPreparer.java | 38 +++++++++++++++
.../IncrementalTaskPositionManager.java | 12 ++---
.../MigrationJobConfigurationChangedProcessor.java | 17 ++++++-
.../migration/preparer/MigrationJobPreparer.java | 54 +++++-----------------
4 files changed, 71 insertions(+), 50 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/preparer/PipelineJobPreparer.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/preparer/PipelineJobPreparer.java
new file mode 100644
index 00000000000..1fef91f1abf
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/preparer/PipelineJobPreparer.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.job.preparer;
+
+import
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
+
+import java.sql.SQLException;
+
+/**
+ * Pipeline job preparer.
+ *
+ * @param <T> type of pipeline job item context
+ */
+public interface PipelineJobPreparer<T extends PipelineJobItemContext> {
+
+ /**
+ * Prepare before job execution.
+ *
+ * @param jobItemContext job item context
+ * @throws SQLException SQL exception
+ */
+ void prepare(T jobItemContext) throws SQLException;
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManager.java
index fd2bab1bc6a..154dbadc69d 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManager.java
@@ -44,11 +44,11 @@ public final class IncrementalTaskPositionManager {
private final DatabaseType databaseType;
- private final DialectIncrementalPositionManager positionInitializer;
+ private final DialectIncrementalPositionManager dialectPositionManager;
public IncrementalTaskPositionManager(final DatabaseType databaseType) {
this.databaseType = databaseType;
- positionInitializer =
DatabaseTypedSPILoader.getService(DialectIncrementalPositionManager.class,
databaseType);
+ dialectPositionManager =
DatabaseTypedSPILoader.getService(DialectIncrementalPositionManager.class,
databaseType);
}
/**
@@ -68,7 +68,7 @@ public final class IncrementalTaskPositionManager {
return position.get();
}
}
- return
positionInitializer.init(dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()),
dumperContext.getJobId());
+ return
dialectPositionManager.init(dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()),
dumperContext.getJobId());
}
/**
@@ -82,11 +82,11 @@ public final class IncrementalTaskPositionManager {
final long startTimeMillis = System.currentTimeMillis();
log.info("Cleanup position, database type: {}, pipeline data source
type: {}", databaseType.getType(), pipelineDataSourceConfig.getType());
if (pipelineDataSourceConfig instanceof
ShardingSpherePipelineDataSourceConfiguration) {
- destroyPosition(jobId,
(ShardingSpherePipelineDataSourceConfiguration) pipelineDataSourceConfig,
positionInitializer);
+ destroyPosition(jobId,
(ShardingSpherePipelineDataSourceConfiguration) pipelineDataSourceConfig,
dialectPositionManager);
} else if (pipelineDataSourceConfig instanceof
StandardPipelineDataSourceConfiguration) {
- destroyPosition(jobId, (StandardPipelineDataSourceConfiguration)
pipelineDataSourceConfig, positionInitializer);
+ destroyPosition(jobId, (StandardPipelineDataSourceConfiguration)
pipelineDataSourceConfig, dialectPositionManager);
}
- log.info("destroyPosition cost {} ms", System.currentTimeMillis() -
startTimeMillis);
+ log.info("Destroy position cost {} ms.", System.currentTimeMillis() -
startTimeMillis);
}
private void destroyPosition(final String jobId,
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
index 39545ca0355..b29495b0c91 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
@@ -17,17 +17,23 @@
package
org.apache.shardingsphere.data.pipeline.scenario.migration.metadata.processor;
+import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJob;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.config.processor.JobConfigurationChangedProcessor;
+import
org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.swapper.YamlMigrationJobConfigurationSwapper;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.preparer.MigrationJobPreparer;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import java.sql.SQLException;
+import java.util.Map.Entry;
+
/**
* Migration job configuration changed processor.
*/
+@Slf4j
public final class MigrationJobConfigurationChangedProcessor implements
JobConfigurationChangedProcessor<MigrationJobConfiguration> {
@Override
@@ -37,7 +43,14 @@ public final class MigrationJobConfigurationChangedProcessor
implements JobConfi
@Override
public void clean(final JobConfiguration jobConfig) {
- new MigrationJobPreparer().cleanup(new
YamlMigrationJobConfigurationSwapper().swapToObject(jobConfig.getJobParameter()));
+ MigrationJobConfiguration migrationJobConfig = new
YamlMigrationJobConfigurationSwapper().swapToObject(jobConfig.getJobParameter());
+ for (Entry<String, PipelineDataSourceConfiguration> entry :
migrationJobConfig.getSources().entrySet()) {
+ try {
+ new
IncrementalTaskPositionManager(entry.getValue().getDatabaseType()).destroyPosition(migrationJobConfig.getJobId(),
entry.getValue());
+ } catch (final SQLException ex) {
+ log.warn("Job destroying failed, jobId={}, dataSourceName={}",
migrationJobConfig.getJobId(), entry.getKey(), ex);
+ }
+ }
}
@Override
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index 1ef92b4cd5b..7679a0ab4ea 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.scenario.migration.preparer;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.channel.IncrementalChannelCreator;
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
@@ -41,6 +40,7 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPositi
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
+import
org.apache.shardingsphere.data.pipeline.core.job.preparer.PipelineJobPreparer;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
import org.apache.shardingsphere.data.pipeline.core.job.progress.JobOffsetInfo;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
@@ -74,38 +74,25 @@ import org.apache.shardingsphere.parser.rule.SQLParserRule;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
-import java.util.Map.Entry;
/**
* Migration job preparer.
*/
@Slf4j
-public final class MigrationJobPreparer {
+public final class MigrationJobPreparer implements
PipelineJobPreparer<MigrationJobItemContext> {
- private final MigrationJobType jobType = new MigrationJobType();
+ private final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager = new PipelineJobItemManager<>(new
MigrationJobType().getYamlJobItemProgressSwapper());
- private final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager = new
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
-
- /**
- * Do prepare work.
- *
- * @param jobItemContext job item context
- * @throws SQLException SQL exception
- * @throws PipelineJobCancelingException pipeline job canceled exception
- */
- public void prepare(final MigrationJobItemContext jobItemContext) throws
SQLException, PipelineJobCancelingException {
+ @Override
+ public void prepare(final MigrationJobItemContext jobItemContext) throws
SQLException {
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(
jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig().getClass()),
- () -> new UnsupportedSQLOperationException("Migration
inventory dumper only support StandardPipelineDataSourceConfiguration"));
+ () -> new UnsupportedSQLOperationException("Migration
inventory dumper only support StandardPipelineDataSourceConfiguration."));
DatabaseType sourceDatabaseType =
jobItemContext.getJobConfig().getSourceDatabaseType();
new
PipelineDataSourceCheckEngine(sourceDatabaseType).checkSourceDataSources(Collections.singleton(jobItemContext.getSourceDataSource()));
- if (jobItemContext.isStopping()) {
- throw new PipelineJobCancelingException();
- }
+ ShardingSpherePreconditions.checkState(!jobItemContext.isStopping(),
PipelineJobCancelingException::new);
prepareAndCheckTargetWithLock(jobItemContext);
- if (jobItemContext.isStopping()) {
- throw new PipelineJobCancelingException();
- }
+ ShardingSpherePreconditions.checkState(!jobItemContext.isStopping(),
PipelineJobCancelingException::new);
boolean isIncrementalSupported =
DatabaseTypedSPILoader.findService(DialectIncrementalDumperCreator.class,
sourceDatabaseType).isPresent();
if (isIncrementalSupported) {
prepareIncremental(jobItemContext);
@@ -113,11 +100,9 @@ public final class MigrationJobPreparer {
initInventoryTasks(jobItemContext);
if (isIncrementalSupported) {
initIncrementalTasks(jobItemContext);
- if (jobItemContext.isStopping()) {
- throw new PipelineJobCancelingException();
- }
+
ShardingSpherePreconditions.checkState(!jobItemContext.isStopping(),
PipelineJobCancelingException::new);
}
- log.info("prepare, jobId={}, shardingItem={}, inventoryTasks={},
incrementalTasks={}",
+ log.info("Prepare job, jobId={}, shardingItem={}, inventoryTasks={},
incrementalTasks={}",
jobItemContext.getJobId(), jobItemContext.getShardingItem(),
jobItemContext.getInventoryTasks(), jobItemContext.getIncrementalTasks());
}
@@ -142,11 +127,11 @@ public final class MigrationJobPreparer {
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getOffset().persist(jobId,
new JobOffsetInfo(true));
}
} finally {
- log.info("unlock, jobId={}, shardingItem={}, cost {} ms",
jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() -
startTimeMillis);
+ log.info("Unlock, jobId={}, shardingItem={}, cost {} ms",
jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() -
startTimeMillis);
lockContext.unlock(lockDefinition);
}
} else {
- log.warn("try lock failed, jobId={}, shardingItem={}", jobId,
jobItemContext.getShardingItem());
+ log.warn("Try lock failed, jobId={}, shardingItem={}", jobId,
jobItemContext.getShardingItem());
}
}
@@ -204,19 +189,4 @@ public final class MigrationJobPreparer {
PipelineTask incrementalTask = new
IncrementalTask(dumperContext.getCommonContext().getDataSourceName(),
incrementalExecuteEngine, dumper, importers, taskProgress);
jobItemContext.getIncrementalTasks().add(incrementalTask);
}
-
- /**
- * Do cleanup work.
- *
- * @param jobConfig job configuration
- */
- public void cleanup(final MigrationJobConfiguration jobConfig) {
- for (Entry<String, PipelineDataSourceConfiguration> entry :
jobConfig.getSources().entrySet()) {
- try {
- new
IncrementalTaskPositionManager(entry.getValue().getDatabaseType()).destroyPosition(jobConfig.getJobId(),
entry.getValue());
- } catch (final SQLException ex) {
- log.warn("job destroying failed, jobId={}, dataSourceName={}",
jobConfig.getJobId(), entry.getKey(), ex);
- }
- }
- }
}