This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new ef175cb5737 Allow drop active CDC jobs via DistSQL (#29965)
ef175cb5737 is described below

commit ef175cb57370e9a7adc326e8d002e859f00e1e0b
Author: Xinze Guo <[email protected]>
AuthorDate: Sun Feb 4 19:29:15 2024 +0800

    Allow drop active CDC jobs via DistSQL (#29965)
---
 .../org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java    | 1 -
 .../shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java   | 4 ++++
 .../test/e2e/data/pipeline/cases/PipelineContainerComposer.java       | 2 +-
 3 files changed, 5 insertions(+), 2 deletions(-)

diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index d5e5a0da150..29e65fbfce1 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -260,7 +260,6 @@ public final class CDCJobAPI implements TransmissionJobAPI {
      */
     public void drop(final String jobId) {
         CDCJobConfiguration jobConfig = 
jobConfigManager.getJobConfiguration(jobId);
-        
ShardingSpherePreconditions.checkState(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).isDisabled(),
 () -> new PipelineInternalException("Can't drop streaming job which is 
active"));
         jobManager.drop(jobId);
         cleanup(jobConfig);
     }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index 31ce2d58429..03976216667 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -45,10 +45,12 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.StreamDataR
 import org.apache.shardingsphere.data.pipeline.cdc.util.CDCDataNodeUtils;
 import org.apache.shardingsphere.data.pipeline.cdc.util.CDCSchemaTableUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
 import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
+import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
 import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
@@ -169,6 +171,8 @@ public final class CDCBackendHandler {
      * @param jobId job ID
      */
     public void dropStreaming(final String jobId) {
+        
ShardingSpherePreconditions.checkState(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).isDisabled(),
+                () -> new PipelineInternalException("Can't drop streaming job 
which is active"));
         jobAPI.drop(jobId);
     }
     
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index d3ba2ec2a00..8adcabadf1c 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -479,7 +479,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
             for (Map<String, Object> each : listJobStatus) {
                 assertTrue(Strings.isNullOrEmpty((String) 
each.get("error_message")), "error_message: `" + each.get("error_message") + 
"`");
                 actualStatus.add(each.get("status").toString());
-                String incrementalIdleSeconds = 
each.get("incremental_idle_seconds").toString();
+                String incrementalIdleSeconds = (String) 
each.get("incremental_idle_seconds");
                 
incrementalIdleSecondsList.add(Strings.isNullOrEmpty(incrementalIdleSeconds) ? 
0 : Integer.parseInt(incrementalIdleSeconds));
             }
             if (Collections.min(incrementalIdleSecondsList) <= 5) {

Reply via email to