This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 38178b10205 Support stopping job on preparation (#17878)
38178b10205 is described below

commit 38178b10205d866c4bd4ff92affdf9079c501245
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Mon May 23 23:19:26 2022 +0800

    Support stopping job on preparation (#17878)
    
    * Support stopping job on preparation
    
    * Tiny update
---
 .../core/api/impl/RuleAlteredJobAPIImpl.java       |  3 +-
 .../pipeline/core/execute/PipelineJobExecutor.java |  7 ++-
 .../data/pipeline/core/job/FinishedCheckJob.java   |  4 +-
 .../scenario/rulealtered/RuleAlteredJob.java       | 40 +++++++++-------
 .../scenario/rulealtered/RuleAlteredJobCenter.java | 56 ++++++++++++++++++++++
 .../rulealtered/RuleAlteredJobContext.java         |  8 ++--
 .../rulealtered/RuleAlteredJobPreparer.java        |  9 ++++
 .../rulealtered/RuleAlteredJobScheduler.java       | 29 ++++++++++-
 .../rulealtered/RuleAlteredJobSchedulerCenter.java |  2 +-
 .../opengauss/ingest/OpenGaussWalDumper.java       |  3 +-
 10 files changed, 132 insertions(+), 29 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
index ac3cb12b716..dccb4ccb72c 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
@@ -38,6 +38,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecuti
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJob;
+import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobCenter;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobProgressDetector;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobSchedulerCenter;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
@@ -343,7 +344,7 @@ public final class RuleAlteredJobAPIImpl extends 
AbstractPipelineJobAPIImpl impl
         for (int each : repositoryAPI.getShardingItems(jobId)) {
             repositoryAPI.updateShardingJobStatus(jobId, each, 
JobStatus.FINISHED);
         }
-        RuleAlteredJobSchedulerCenter.stop(jobId);
+        RuleAlteredJobCenter.stop(jobId);
         stop(jobId);
     }
     
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
index 72474b2834c..ead132e38ce 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
@@ -26,6 +26,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
 import org.apache.shardingsphere.data.pipeline.core.lock.PipelineSimpleLock;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJob;
+import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobCenter;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobPreparer;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobProgressDetector;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobSchedulerCenter;
@@ -75,7 +76,7 @@ public final class PipelineJobExecutor extends 
AbstractLifecycleExecutor {
         boolean isDisabled = jobConfigPOJO.isDisabled();
         if (isDeleted || isDisabled) {
             log.info("jobId={}, deleted={}, disabled={}", 
jobConfigPOJO.getJobName(), isDeleted, isDisabled);
-            RuleAlteredJobSchedulerCenter.stop(jobConfigPOJO.getJobName());
+            RuleAlteredJobCenter.stop(jobConfigPOJO.getJobName());
             // TODO refactor: dispatch to different job types
             RuleAlteredJobConfiguration jobConfig = 
RuleAlteredJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
             if (isDeleted) {
@@ -105,7 +106,9 @@ public final class PipelineJobExecutor extends 
AbstractLifecycleExecutor {
         String databaseName = 
RuleAlteredJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter()).getDatabaseName();
         if (PipelineSimpleLock.getInstance().tryLock(databaseName, 3000)) {
             log.info("{} added to executing jobs success", 
jobConfigPOJO.getJobName());
-            new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), new 
RuleAlteredJob(), jobConfigPOJO.toJobConfiguration()).execute();
+            RuleAlteredJob job = new RuleAlteredJob();
+            RuleAlteredJobCenter.addJob(jobConfigPOJO.getJobName(), job);
+            new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), 
job, jobConfigPOJO.toJobConfiguration()).execute();
         } else {
             log.info("tryLock failed, databaseName={}", databaseName);
         }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
index 724c70fca0a..9ae3f7dfbe7 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.core.job;
 
-import io.vertx.core.impl.ConcurrentHashSet;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
 import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
@@ -37,13 +36,14 @@ import 
org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
 
 @Slf4j
 public final class FinishedCheckJob implements SimpleJob {
     
     private final RuleAlteredJobAPI ruleAlteredJobAPI = 
RuleAlteredJobAPIFactory.getInstance();
     
-    private final Set<String> onCheckJobIds = new ConcurrentHashSet<>();
+    private final Set<String> onCheckJobIds = new ConcurrentSkipListSet<>();
     
     // TODO only one proxy node could do data consistency check in proxy 
cluster
     @Override
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
index cbfa0940ef2..310c3adefa3 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
@@ -20,13 +20,10 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
-import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingReleaseDatabaseLevelLockEvent;
 
 /**
  * Rule altered job.
@@ -36,30 +33,39 @@ public final class RuleAlteredJob implements SimpleJob {
     
     private final GovernanceRepositoryAPI governanceRepositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI();
     
+    private volatile String jobId;
+    
     // Shared by all sharding items
     private final RuleAlteredJobPreparer jobPreparer = new 
RuleAlteredJobPreparer();
     
+    private volatile boolean stopping;
+    
     @Override
     public void execute(final ShardingContext shardingContext) {
         log.info("Execute job {}-{}", shardingContext.getJobName(), 
shardingContext.getShardingItem());
+        if (stopping) {
+            log.info("stopping true, ignore");
+            return;
+        }
+        jobId = shardingContext.getJobName();
         RuleAlteredJobConfiguration jobConfig = 
RuleAlteredJobConfigurationSwapper.swapToObject(shardingContext.getJobParameter());
         RuleAlteredJobContext jobContext = new 
RuleAlteredJobContext(jobConfig, shardingContext.getShardingItem());
+        // TODO use final for initProgress and jobPreparer
         
jobContext.setInitProgress(governanceRepositoryAPI.getJobProgress(jobContext.getJobId(),
 jobContext.getShardingItem()));
         jobContext.setJobPreparer(jobPreparer);
-        try {
-            jobPreparer.prepare(jobContext);
-            // CHECKSTYLE:OFF
-        } catch (final RuntimeException ex) {
-            // CHECKSTYLE:ON
-            log.error("job prepare failed, {}-{}", 
shardingContext.getJobName(), shardingContext.getShardingItem(), ex);
-            RuleAlteredJobSchedulerCenter.stop(shardingContext.getJobName());
-            jobContext.setStatus(JobStatus.PREPARING_FAILURE);
-            governanceRepositoryAPI.persistJobProgress(jobContext);
-            ScalingReleaseDatabaseLevelLockEvent event = new 
ScalingReleaseDatabaseLevelLockEvent(jobConfig.getDatabaseName());
-            ShardingSphereEventBus.getInstance().post(event);
-            throw ex;
-        }
-        governanceRepositoryAPI.persistJobProgress(jobContext);
         RuleAlteredJobSchedulerCenter.start(jobContext);
     }
+    
+    /**
+     * Stop job.
+     */
+    public void stop() {
+        stopping = true;
+        if (null == jobId) {
+            log.info("stop, jobId is null, ignore");
+            return;
+        }
+        log.info("stop, jobId={}", jobId);
+        RuleAlteredJobSchedulerCenter.stop(jobId);
+    }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobCenter.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobCenter.java
new file mode 100644
index 00000000000..478089aa45b
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobCenter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Rule altered job center.
+ */
+@Slf4j
+public final class RuleAlteredJobCenter {
+    
+    private static final Map<String, RuleAlteredJob> JOB_MAP = new 
ConcurrentHashMap<>();
+    
+    /**
+     * Add job.
+     *
+     * @param jobId job id
+     * @param job job
+     */
+    public static void addJob(final String jobId, final RuleAlteredJob job) {
+        JOB_MAP.put(jobId, job);
+    }
+    
+    /**
+     * Stop job.
+     *
+     * @param jobId job id
+     */
+    public static void stop(final String jobId) {
+        RuleAlteredJob job = JOB_MAP.get(jobId);
+        if (null == job) {
+            log.info("job is null, ignore, jobId={}", jobId);
+            return;
+        }
+        job.stop();
+    }
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
index 35059e626c7..8430342f1b5 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
@@ -49,9 +49,11 @@ public final class RuleAlteredJobContext {
     
     private final int shardingItem;
     
-    private JobStatus status = JobStatus.RUNNING;
+    private volatile boolean stopping;
     
-    private JobProgress initProgress;
+    private volatile JobStatus status = JobStatus.RUNNING;
+    
+    private volatile JobProgress initProgress;
     
     private final TaskConfiguration taskConfig;
     
@@ -81,7 +83,7 @@ public final class RuleAlteredJobContext {
         }
     };
     
-    private RuleAlteredJobPreparer jobPreparer;
+    private volatile RuleAlteredJobPreparer jobPreparer;
     
     public RuleAlteredJobContext(final RuleAlteredJobConfiguration jobConfig, 
final int jobShardingItem) {
         ruleAlteredContext = 
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index eaf7532443f..358392f8f64 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -78,10 +78,19 @@ public final class RuleAlteredJobPreparer {
         // TODO Initialize source and target data source after tasks 
initialization, since dumper and importer constructor might call 
appendJDBCQueryProperties.
         // But InventoryTaskSplitter need to check target tables. It need to 
do some refactoring for appendJDBCQueryProperties vocations.
         checkSourceDataSource(jobContext);
+        if (jobContext.isStopping()) {
+            throw new PipelineJobPrepareFailedException("Job stopping, jobId=" 
+ jobContext.getJobId());
+        }
         prepareAndCheckTargetWithLock(jobContext);
+        if (jobContext.isStopping()) {
+            throw new PipelineJobPrepareFailedException("Job stopping, jobId=" 
+ jobContext.getJobId());
+        }
         // TODO check metadata
         try {
             initIncrementalTasks(jobContext);
+            if (jobContext.isStopping()) {
+                throw new PipelineJobPrepareFailedException("Job stopping, 
jobId=" + jobContext.getJobId());
+            }
             initInventoryTasks(jobContext);
             log.info("prepare, jobId={}, shardingItem={}, inventoryTasks={}, 
incrementalTasks={}",
                     jobContext.getJobId(), jobContext.getShardingItem(), 
jobContext.getInventoryTasks(), jobContext.getIncrementalTasks());
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
index 4043474cafc..2f44eab8db2 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
@@ -22,6 +22,8 @@ import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
@@ -50,7 +52,8 @@ public final class RuleAlteredJobScheduler implements 
Runnable {
      * Stop all task.
      */
     public void stop() {
-        log.info("stop job {}", jobContext.getJobId());
+        jobContext.setStopping(true);
+        log.info("stop, jobId={}, shardingItem={}", jobContext.getJobId(), 
jobContext.getShardingItem());
         for (InventoryTask each : jobContext.getInventoryTasks()) {
             log.info("stop inventory task {} - {}", jobContext.getJobId(), 
each.getTaskId());
             each.stop();
@@ -66,7 +69,31 @@ public final class RuleAlteredJobScheduler implements 
Runnable {
     
     @Override
     public void run() {
+        String jobId = jobContext.getJobId();
+        GovernanceRepositoryAPI governanceRepositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI();
+        try {
+            jobContext.getJobPreparer().prepare(jobContext);
+            // CHECKSTYLE:OFF
+        } catch (final RuntimeException ex) {
+            // CHECKSTYLE:ON
+            log.error("job prepare failed, {}-{}", jobId, 
jobContext.getShardingItem(), ex);
+            RuleAlteredJobCenter.stop(jobId);
+            jobContext.setStatus(JobStatus.PREPARING_FAILURE);
+            governanceRepositoryAPI.persistJobProgress(jobContext);
+            ScalingReleaseDatabaseLevelLockEvent event = new 
ScalingReleaseDatabaseLevelLockEvent(jobContext.getJobConfig().getDatabaseName());
+            ShardingSphereEventBus.getInstance().post(event);
+            throw ex;
+        }
+        if (jobContext.isStopping()) {
+            log.info("job stopping, ignore inventory task");
+            return;
+        }
+        governanceRepositoryAPI.persistJobProgress(jobContext);
         if (executeInventoryTask()) {
+            if (jobContext.isStopping()) {
+                log.info("stopping, ignore incremental task");
+                return;
+            }
             executeIncrementalTask();
         }
     }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
index 9950b2ef19a..24bf821128f 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
@@ -72,7 +72,7 @@ public final class RuleAlteredJobSchedulerCenter {
      *
      * @param jobId job id
      */
-    public static void stop(final String jobId) {
+    static void stop(final String jobId) {
         log.info("remove and stop {}", jobId);
         Map<Integer, RuleAlteredJobScheduler> schedulerMap = 
JOB_SCHEDULER_MAP.get(jobId);
         if (null == schedulerMap) {
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
index 0508d7d5e19..2769ad9303a 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
@@ -95,8 +95,7 @@ public final class OpenGaussWalDumper extends 
AbstractIncrementalDumper<WalPosit
             if (null != stream) {
                 try {
                     stream.close();
-                } catch (final SQLException ex) {
-                    log.error("Close PGReplicationStream failed", ex);
+                } catch (final SQLException ignored) {
                 }
             }
         }

Reply via email to