This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new ef175cb5737 Allow drop active CDC jobs via DistSQL (#29965)
ef175cb5737 is described below
commit ef175cb57370e9a7adc326e8d002e859f00e1e0b
Author: Xinze Guo <[email protected]>
AuthorDate: Sun Feb 4 19:29:15 2024 +0800
Allow drop active CDC jobs via DistSQL (#29965)
---
.../org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java | 1 -
.../shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java | 4 ++++
.../test/e2e/data/pipeline/cases/PipelineContainerComposer.java | 2 +-
3 files changed, 5 insertions(+), 2 deletions(-)
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index d5e5a0da150..29e65fbfce1 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -260,7 +260,6 @@ public final class CDCJobAPI implements TransmissionJobAPI {
*/
public void drop(final String jobId) {
CDCJobConfiguration jobConfig =
jobConfigManager.getJobConfiguration(jobId);
-
ShardingSpherePreconditions.checkState(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).isDisabled(),
() -> new PipelineInternalException("Can't drop streaming job which is
active"));
jobManager.drop(jobId);
cleanup(jobConfig);
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index 31ce2d58429..03976216667 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -45,10 +45,12 @@ import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.StreamDataR
import org.apache.shardingsphere.data.pipeline.cdc.util.CDCDataNodeUtils;
import org.apache.shardingsphere.data.pipeline.cdc.util.CDCSchemaTableUtils;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
+import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
+import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
@@ -169,6 +171,8 @@ public final class CDCBackendHandler {
* @param jobId job ID
*/
public void dropStreaming(final String jobId) {
+
ShardingSpherePreconditions.checkState(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).isDisabled(),
+ () -> new PipelineInternalException("Can't drop streaming job
which is active"));
jobAPI.drop(jobId);
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index d3ba2ec2a00..8adcabadf1c 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -479,7 +479,7 @@ public final class PipelineContainerComposer implements
AutoCloseable {
for (Map<String, Object> each : listJobStatus) {
assertTrue(Strings.isNullOrEmpty((String)
each.get("error_message")), "error_message: `" + each.get("error_message") +
"`");
actualStatus.add(each.get("status").toString());
- String incrementalIdleSeconds =
each.get("incremental_idle_seconds").toString();
+ String incrementalIdleSeconds = (String)
each.get("incremental_idle_seconds");
incrementalIdleSecondsList.add(Strings.isNullOrEmpty(incrementalIdleSeconds) ?
0 : Integer.parseInt(incrementalIdleSeconds));
}
if (Collections.min(incrementalIdleSecondsList) <= 5) {