This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new dc00a1b566e Add PipelineJobPreparer (#32758)
dc00a1b566e is described below

commit dc00a1b566e2ebe4f9e321445105a46b6128cbb3
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Sep 1 13:51:10 2024 +0800

    Add PipelineJobPreparer (#32758)
    
    * Add PipelineJobPreparer
    
    * Add PipelineJobPreparer
---
 .../core/job/preparer/PipelineJobPreparer.java     | 38 +++++++++++++++
 .../IncrementalTaskPositionManager.java            | 12 ++---
 .../MigrationJobConfigurationChangedProcessor.java | 17 ++++++-
 .../migration/preparer/MigrationJobPreparer.java   | 54 +++++-----------------
 4 files changed, 71 insertions(+), 50 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/preparer/PipelineJobPreparer.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/preparer/PipelineJobPreparer.java
new file mode 100644
index 00000000000..1fef91f1abf
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/preparer/PipelineJobPreparer.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.job.preparer;
+
+import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
+
+import java.sql.SQLException;
+
+/**
+ * Pipeline job preparer.
+ * 
+ * @param <T> type of pipeline job item context
+ */
+public interface PipelineJobPreparer<T extends PipelineJobItemContext> {
+    
+    /**
+     * Prepare before job execution.
+     *
+     * @param jobItemContext job item context
+     * @throws SQLException SQL exception
+     */
+    void prepare(T jobItemContext) throws SQLException;
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManager.java
index fd2bab1bc6a..154dbadc69d 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManager.java
@@ -44,11 +44,11 @@ public final class IncrementalTaskPositionManager {
     
     private final DatabaseType databaseType;
     
-    private final DialectIncrementalPositionManager positionInitializer;
+    private final DialectIncrementalPositionManager dialectPositionManager;
     
     public IncrementalTaskPositionManager(final DatabaseType databaseType) {
         this.databaseType = databaseType;
-        positionInitializer = 
DatabaseTypedSPILoader.getService(DialectIncrementalPositionManager.class, 
databaseType);
+        dialectPositionManager = 
DatabaseTypedSPILoader.getService(DialectIncrementalPositionManager.class, 
databaseType);
     }
     
     /**
@@ -68,7 +68,7 @@ public final class IncrementalTaskPositionManager {
                 return position.get();
             }
         }
-        return 
positionInitializer.init(dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()),
 dumperContext.getJobId());
+        return 
dialectPositionManager.init(dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()),
 dumperContext.getJobId());
     }
     
     /**
@@ -82,11 +82,11 @@ public final class IncrementalTaskPositionManager {
         final long startTimeMillis = System.currentTimeMillis();
         log.info("Cleanup position, database type: {}, pipeline data source 
type: {}", databaseType.getType(), pipelineDataSourceConfig.getType());
         if (pipelineDataSourceConfig instanceof 
ShardingSpherePipelineDataSourceConfiguration) {
-            destroyPosition(jobId, 
(ShardingSpherePipelineDataSourceConfiguration) pipelineDataSourceConfig, 
positionInitializer);
+            destroyPosition(jobId, 
(ShardingSpherePipelineDataSourceConfiguration) pipelineDataSourceConfig, 
dialectPositionManager);
         } else if (pipelineDataSourceConfig instanceof 
StandardPipelineDataSourceConfiguration) {
-            destroyPosition(jobId, (StandardPipelineDataSourceConfiguration) 
pipelineDataSourceConfig, positionInitializer);
+            destroyPosition(jobId, (StandardPipelineDataSourceConfiguration) 
pipelineDataSourceConfig, dialectPositionManager);
         }
-        log.info("destroyPosition cost {} ms", System.currentTimeMillis() - 
startTimeMillis);
+        log.info("Destroy position cost {} ms.", System.currentTimeMillis() - 
startTimeMillis);
     }
     
     private void destroyPosition(final String jobId,
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
index 39545ca0355..b29495b0c91 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
@@ -17,17 +17,23 @@
 
 package 
org.apache.shardingsphere.data.pipeline.scenario.migration.metadata.processor;
 
+import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJob;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.config.processor.JobConfigurationChangedProcessor;
+import 
org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.swapper.YamlMigrationJobConfigurationSwapper;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.preparer.MigrationJobPreparer;
 import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
 
+import java.sql.SQLException;
+import java.util.Map.Entry;
+
 /**
  * Migration job configuration changed processor.
  */
+@Slf4j
 public final class MigrationJobConfigurationChangedProcessor implements 
JobConfigurationChangedProcessor<MigrationJobConfiguration> {
     
     @Override
@@ -37,7 +43,14 @@ public final class MigrationJobConfigurationChangedProcessor 
implements JobConfi
     
     @Override
     public void clean(final JobConfiguration jobConfig) {
-        new MigrationJobPreparer().cleanup(new 
YamlMigrationJobConfigurationSwapper().swapToObject(jobConfig.getJobParameter()));
+        MigrationJobConfiguration migrationJobConfig = new 
YamlMigrationJobConfigurationSwapper().swapToObject(jobConfig.getJobParameter());
+        for (Entry<String, PipelineDataSourceConfiguration> entry : 
migrationJobConfig.getSources().entrySet()) {
+            try {
+                new 
IncrementalTaskPositionManager(entry.getValue().getDatabaseType()).destroyPosition(migrationJobConfig.getJobId(),
 entry.getValue());
+            } catch (final SQLException ex) {
+                log.warn("Job destroying failed, jobId={}, dataSourceName={}", 
migrationJobConfig.getJobId(), entry.getKey(), ex);
+            }
+        }
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index 1ef92b4cd5b..7679a0ab4ea 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.data.pipeline.scenario.migration.preparer;
 
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.channel.IncrementalChannelCreator;
 import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
@@ -41,6 +40,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPositi
 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.job.preparer.PipelineJobPreparer;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.JobOffsetInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
@@ -74,38 +74,25 @@ import org.apache.shardingsphere.parser.rule.SQLParserRule;
 import java.sql.SQLException;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Map.Entry;
 
 /**
  * Migration job preparer.
  */
 @Slf4j
-public final class MigrationJobPreparer {
+public final class MigrationJobPreparer implements 
PipelineJobPreparer<MigrationJobItemContext> {
     
-    private final MigrationJobType jobType = new MigrationJobType();
+    private final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager = new PipelineJobItemManager<>(new 
MigrationJobType().getYamlJobItemProgressSwapper());
     
-    private final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager = new 
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
-    
-    /**
-     * Do prepare work.
-     *
-     * @param jobItemContext job item context
-     * @throws SQLException SQL exception
-     * @throws PipelineJobCancelingException pipeline job canceled exception
-     */
-    public void prepare(final MigrationJobItemContext jobItemContext) throws 
SQLException, PipelineJobCancelingException {
+    @Override
+    public void prepare(final MigrationJobItemContext jobItemContext) throws 
SQLException {
         
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(
                 
jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig().getClass()),
-                () -> new UnsupportedSQLOperationException("Migration 
inventory dumper only support StandardPipelineDataSourceConfiguration"));
+                () -> new UnsupportedSQLOperationException("Migration 
inventory dumper only support StandardPipelineDataSourceConfiguration."));
         DatabaseType sourceDatabaseType = 
jobItemContext.getJobConfig().getSourceDatabaseType();
         new 
PipelineDataSourceCheckEngine(sourceDatabaseType).checkSourceDataSources(Collections.singleton(jobItemContext.getSourceDataSource()));
-        if (jobItemContext.isStopping()) {
-            throw new PipelineJobCancelingException();
-        }
+        ShardingSpherePreconditions.checkState(!jobItemContext.isStopping(), 
PipelineJobCancelingException::new);
         prepareAndCheckTargetWithLock(jobItemContext);
-        if (jobItemContext.isStopping()) {
-            throw new PipelineJobCancelingException();
-        }
+        ShardingSpherePreconditions.checkState(!jobItemContext.isStopping(), 
PipelineJobCancelingException::new);
         boolean isIncrementalSupported = 
DatabaseTypedSPILoader.findService(DialectIncrementalDumperCreator.class, 
sourceDatabaseType).isPresent();
         if (isIncrementalSupported) {
             prepareIncremental(jobItemContext);
@@ -113,11 +100,9 @@ public final class MigrationJobPreparer {
         initInventoryTasks(jobItemContext);
         if (isIncrementalSupported) {
             initIncrementalTasks(jobItemContext);
-            if (jobItemContext.isStopping()) {
-                throw new PipelineJobCancelingException();
-            }
+            
ShardingSpherePreconditions.checkState(!jobItemContext.isStopping(), 
PipelineJobCancelingException::new);
         }
-        log.info("prepare, jobId={}, shardingItem={}, inventoryTasks={}, 
incrementalTasks={}",
+        log.info("Prepare job, jobId={}, shardingItem={}, inventoryTasks={}, 
incrementalTasks={}",
                 jobItemContext.getJobId(), jobItemContext.getShardingItem(), 
jobItemContext.getInventoryTasks(), jobItemContext.getIncrementalTasks());
     }
     
@@ -142,11 +127,11 @@ public final class MigrationJobPreparer {
                     
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getOffset().persist(jobId,
 new JobOffsetInfo(true));
                 }
             } finally {
-                log.info("unlock, jobId={}, shardingItem={}, cost {} ms", 
jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - 
startTimeMillis);
+                log.info("Unlock, jobId={}, shardingItem={}, cost {} ms", 
jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - 
startTimeMillis);
                 lockContext.unlock(lockDefinition);
             }
         } else {
-            log.warn("try lock failed, jobId={}, shardingItem={}", jobId, 
jobItemContext.getShardingItem());
+            log.warn("Try lock failed, jobId={}, shardingItem={}", jobId, 
jobItemContext.getShardingItem());
         }
     }
     
@@ -204,19 +189,4 @@ public final class MigrationJobPreparer {
         PipelineTask incrementalTask = new 
IncrementalTask(dumperContext.getCommonContext().getDataSourceName(), 
incrementalExecuteEngine, dumper, importers, taskProgress);
         jobItemContext.getIncrementalTasks().add(incrementalTask);
     }
-    
-    /**
-     * Do cleanup work.
-     *
-     * @param jobConfig job configuration
-     */
-    public void cleanup(final MigrationJobConfiguration jobConfig) {
-        for (Entry<String, PipelineDataSourceConfiguration> entry : 
jobConfig.getSources().entrySet()) {
-            try {
-                new 
IncrementalTaskPositionManager(entry.getValue().getDatabaseType()).destroyPosition(jobConfig.getJobId(),
 entry.getValue());
-            } catch (final SQLException ex) {
-                log.warn("job destroying failed, jobId={}, dataSourceName={}", 
jobConfig.getJobId(), entry.getKey(), ex);
-            }
-        }
-    }
 }

Reply via email to