This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 38178b10205 Support stopping job on preparation (#17878)
38178b10205 is described below
commit 38178b10205d866c4bd4ff92affdf9079c501245
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Mon May 23 23:19:26 2022 +0800
Support stopping job on preparation (#17878)
* Support stopping job on preparation
* Tiny update
---
.../core/api/impl/RuleAlteredJobAPIImpl.java | 3 +-
.../pipeline/core/execute/PipelineJobExecutor.java | 7 ++-
.../data/pipeline/core/job/FinishedCheckJob.java | 4 +-
.../scenario/rulealtered/RuleAlteredJob.java | 40 +++++++++-------
.../scenario/rulealtered/RuleAlteredJobCenter.java | 56 ++++++++++++++++++++++
.../rulealtered/RuleAlteredJobContext.java | 8 ++--
.../rulealtered/RuleAlteredJobPreparer.java | 9 ++++
.../rulealtered/RuleAlteredJobScheduler.java | 29 ++++++++++-
.../rulealtered/RuleAlteredJobSchedulerCenter.java | 2 +-
.../opengauss/ingest/OpenGaussWalDumper.java | 3 +-
10 files changed, 132 insertions(+), 29 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
index ac3cb12b716..dccb4ccb72c 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
@@ -38,6 +38,7 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecuti
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJob;
+import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobCenter;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobProgressDetector;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobSchedulerCenter;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
@@ -343,7 +344,7 @@ public final class RuleAlteredJobAPIImpl extends
AbstractPipelineJobAPIImpl impl
for (int each : repositoryAPI.getShardingItems(jobId)) {
repositoryAPI.updateShardingJobStatus(jobId, each,
JobStatus.FINISHED);
}
- RuleAlteredJobSchedulerCenter.stop(jobId);
+ RuleAlteredJobCenter.stop(jobId);
stop(jobId);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
index 72474b2834c..ead132e38ce 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
@@ -26,6 +26,7 @@ import
org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
import org.apache.shardingsphere.data.pipeline.core.lock.PipelineSimpleLock;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJob;
+import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobCenter;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobPreparer;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobProgressDetector;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobSchedulerCenter;
@@ -75,7 +76,7 @@ public final class PipelineJobExecutor extends
AbstractLifecycleExecutor {
boolean isDisabled = jobConfigPOJO.isDisabled();
if (isDeleted || isDisabled) {
log.info("jobId={}, deleted={}, disabled={}",
jobConfigPOJO.getJobName(), isDeleted, isDisabled);
- RuleAlteredJobSchedulerCenter.stop(jobConfigPOJO.getJobName());
+ RuleAlteredJobCenter.stop(jobConfigPOJO.getJobName());
// TODO refactor: dispatch to different job types
RuleAlteredJobConfiguration jobConfig =
RuleAlteredJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
if (isDeleted) {
@@ -105,7 +106,9 @@ public final class PipelineJobExecutor extends
AbstractLifecycleExecutor {
String databaseName =
RuleAlteredJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter()).getDatabaseName();
if (PipelineSimpleLock.getInstance().tryLock(databaseName, 3000)) {
log.info("{} added to executing jobs success",
jobConfigPOJO.getJobName());
- new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), new
RuleAlteredJob(), jobConfigPOJO.toJobConfiguration()).execute();
+ RuleAlteredJob job = new RuleAlteredJob();
+ RuleAlteredJobCenter.addJob(jobConfigPOJO.getJobName(), job);
+ new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(),
job, jobConfigPOJO.toJobConfiguration()).execute();
} else {
log.info("tryLock failed, databaseName={}", databaseName);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
index 724c70fca0a..9ae3f7dfbe7 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.core.job;
-import io.vertx.core.impl.ConcurrentHashSet;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
@@ -37,13 +36,14 @@ import
org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
@Slf4j
public final class FinishedCheckJob implements SimpleJob {
private final RuleAlteredJobAPI ruleAlteredJobAPI =
RuleAlteredJobAPIFactory.getInstance();
- private final Set<String> onCheckJobIds = new ConcurrentHashSet<>();
+ private final Set<String> onCheckJobIds = new ConcurrentSkipListSet<>();
// TODO only one proxy node could do data consistency check in proxy
cluster
@Override
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
index cbfa0940ef2..310c3adefa3 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
@@ -20,13 +20,10 @@ package
org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
-import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingReleaseDatabaseLevelLockEvent;
/**
* Rule altered job.
@@ -36,30 +33,39 @@ public final class RuleAlteredJob implements SimpleJob {
private final GovernanceRepositoryAPI governanceRepositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI();
+ private volatile String jobId;
+
// Shared by all sharding items
private final RuleAlteredJobPreparer jobPreparer = new
RuleAlteredJobPreparer();
+ private volatile boolean stopping;
+
@Override
public void execute(final ShardingContext shardingContext) {
log.info("Execute job {}-{}", shardingContext.getJobName(),
shardingContext.getShardingItem());
+ if (stopping) {
+ log.info("stopping true, ignore");
+ return;
+ }
+ jobId = shardingContext.getJobName();
RuleAlteredJobConfiguration jobConfig =
RuleAlteredJobConfigurationSwapper.swapToObject(shardingContext.getJobParameter());
RuleAlteredJobContext jobContext = new
RuleAlteredJobContext(jobConfig, shardingContext.getShardingItem());
+ // TODO use final for initProgress and jobPreparer
jobContext.setInitProgress(governanceRepositoryAPI.getJobProgress(jobContext.getJobId(),
jobContext.getShardingItem()));
jobContext.setJobPreparer(jobPreparer);
- try {
- jobPreparer.prepare(jobContext);
- // CHECKSTYLE:OFF
- } catch (final RuntimeException ex) {
- // CHECKSTYLE:ON
- log.error("job prepare failed, {}-{}",
shardingContext.getJobName(), shardingContext.getShardingItem(), ex);
- RuleAlteredJobSchedulerCenter.stop(shardingContext.getJobName());
- jobContext.setStatus(JobStatus.PREPARING_FAILURE);
- governanceRepositoryAPI.persistJobProgress(jobContext);
- ScalingReleaseDatabaseLevelLockEvent event = new
ScalingReleaseDatabaseLevelLockEvent(jobConfig.getDatabaseName());
- ShardingSphereEventBus.getInstance().post(event);
- throw ex;
- }
- governanceRepositoryAPI.persistJobProgress(jobContext);
RuleAlteredJobSchedulerCenter.start(jobContext);
}
+
+ /**
+ * Stop job.
+ */
+ public void stop() {
+ stopping = true;
+ if (null == jobId) {
+ log.info("stop, jobId is null, ignore");
+ return;
+ }
+ log.info("stop, jobId={}", jobId);
+ RuleAlteredJobSchedulerCenter.stop(jobId);
+ }
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobCenter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobCenter.java
new file mode 100644
index 00000000000..478089aa45b
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobCenter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Rule altered job center.
+ */
+@Slf4j
+public final class RuleAlteredJobCenter {
+
+ private static final Map<String, RuleAlteredJob> JOB_MAP = new
ConcurrentHashMap<>();
+
+ /**
+ * Add job.
+ *
+ * @param jobId job id
+ * @param job job
+ */
+ public static void addJob(final String jobId, final RuleAlteredJob job) {
+ JOB_MAP.put(jobId, job);
+ }
+
+ /**
+ * Stop job.
+ *
+ * @param jobId job id
+ */
+ public static void stop(final String jobId) {
+ RuleAlteredJob job = JOB_MAP.get(jobId);
+ if (null == job) {
+ log.info("job is null, ignore, jobId={}", jobId);
+ return;
+ }
+ job.stop();
+ }
+}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
index 35059e626c7..8430342f1b5 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
@@ -49,9 +49,11 @@ public final class RuleAlteredJobContext {
private final int shardingItem;
- private JobStatus status = JobStatus.RUNNING;
+ private volatile boolean stopping;
- private JobProgress initProgress;
+ private volatile JobStatus status = JobStatus.RUNNING;
+
+ private volatile JobProgress initProgress;
private final TaskConfiguration taskConfig;
@@ -81,7 +83,7 @@ public final class RuleAlteredJobContext {
}
};
- private RuleAlteredJobPreparer jobPreparer;
+ private volatile RuleAlteredJobPreparer jobPreparer;
public RuleAlteredJobContext(final RuleAlteredJobConfiguration jobConfig,
final int jobShardingItem) {
ruleAlteredContext =
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index eaf7532443f..358392f8f64 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -78,10 +78,19 @@ public final class RuleAlteredJobPreparer {
// TODO Initialize source and target data source after tasks
initialization, since dumper and importer constructor might call
appendJDBCQueryProperties.
// But InventoryTaskSplitter need to check target tables. It need to
do some refactoring for appendJDBCQueryProperties vocations.
checkSourceDataSource(jobContext);
+ if (jobContext.isStopping()) {
+ throw new PipelineJobPrepareFailedException("Job stopping, jobId="
+ jobContext.getJobId());
+ }
prepareAndCheckTargetWithLock(jobContext);
+ if (jobContext.isStopping()) {
+ throw new PipelineJobPrepareFailedException("Job stopping, jobId="
+ jobContext.getJobId());
+ }
// TODO check metadata
try {
initIncrementalTasks(jobContext);
+ if (jobContext.isStopping()) {
+ throw new PipelineJobPrepareFailedException("Job stopping,
jobId=" + jobContext.getJobId());
+ }
initInventoryTasks(jobContext);
log.info("prepare, jobId={}, shardingItem={}, inventoryTasks={},
incrementalTasks={}",
jobContext.getJobId(), jobContext.getShardingItem(),
jobContext.getInventoryTasks(), jobContext.getIncrementalTasks());
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
index 4043474cafc..2f44eab8db2 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
@@ -22,6 +22,8 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
@@ -50,7 +52,8 @@ public final class RuleAlteredJobScheduler implements
Runnable {
* Stop all task.
*/
public void stop() {
- log.info("stop job {}", jobContext.getJobId());
+ jobContext.setStopping(true);
+ log.info("stop, jobId={}, shardingItem={}", jobContext.getJobId(),
jobContext.getShardingItem());
for (InventoryTask each : jobContext.getInventoryTasks()) {
log.info("stop inventory task {} - {}", jobContext.getJobId(),
each.getTaskId());
each.stop();
@@ -66,7 +69,31 @@ public final class RuleAlteredJobScheduler implements
Runnable {
@Override
public void run() {
+ String jobId = jobContext.getJobId();
+ GovernanceRepositoryAPI governanceRepositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI();
+ try {
+ jobContext.getJobPreparer().prepare(jobContext);
+ // CHECKSTYLE:OFF
+ } catch (final RuntimeException ex) {
+ // CHECKSTYLE:ON
+ log.error("job prepare failed, {}-{}", jobId,
jobContext.getShardingItem(), ex);
+ RuleAlteredJobCenter.stop(jobId);
+ jobContext.setStatus(JobStatus.PREPARING_FAILURE);
+ governanceRepositoryAPI.persistJobProgress(jobContext);
+ ScalingReleaseDatabaseLevelLockEvent event = new
ScalingReleaseDatabaseLevelLockEvent(jobContext.getJobConfig().getDatabaseName());
+ ShardingSphereEventBus.getInstance().post(event);
+ throw ex;
+ }
+ if (jobContext.isStopping()) {
+ log.info("job stopping, ignore inventory task");
+ return;
+ }
+ governanceRepositoryAPI.persistJobProgress(jobContext);
if (executeInventoryTask()) {
+ if (jobContext.isStopping()) {
+ log.info("stopping, ignore incremental task");
+ return;
+ }
executeIncrementalTask();
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
index 9950b2ef19a..24bf821128f 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
@@ -72,7 +72,7 @@ public final class RuleAlteredJobSchedulerCenter {
*
* @param jobId job id
*/
- public static void stop(final String jobId) {
+ static void stop(final String jobId) {
log.info("remove and stop {}", jobId);
Map<Integer, RuleAlteredJobScheduler> schedulerMap =
JOB_SCHEDULER_MAP.get(jobId);
if (null == schedulerMap) {
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
index 0508d7d5e19..2769ad9303a 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
@@ -95,8 +95,7 @@ public final class OpenGaussWalDumper extends
AbstractIncrementalDumper<WalPosit
if (null != stream) {
try {
stream.close();
- } catch (final SQLException ex) {
- log.error("Close PGReplicationStream failed", ex);
+ } catch (final SQLException ignored) {
}
}
}