This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 7308e0c9b4f Improve drop streaming, not allow drop active job (#28992)
7308e0c9b4f is described below

commit 7308e0c9b4f4f4f1a260a4fe7d37cc43582c8f62
Author: Xinze Guo <[email protected]>
AuthorDate: Thu Nov 9 18:58:34 2023 +0800

    Improve drop streaming, not allow drop active job (#28992)
    
    * Improve drop streaming, not allow drop active job
    
    * Improve CDCE2EIT
---
 .../cdc/distsql/handler/update/DropStreamingUpdater.java |  2 +-
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java            | 14 ++++++--------
 .../data/pipeline/cdc/handler/CDCBackendHandler.java     |  2 +-
 .../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java       | 16 ++++++++++------
 4 files changed, 18 insertions(+), 16 deletions(-)

diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/DropStreamingUpdater.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/DropStreamingUpdater.java
index 24e8d61da8c..47db9466af5 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/DropStreamingUpdater.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/DropStreamingUpdater.java
@@ -32,7 +32,7 @@ public final class DropStreamingUpdater implements 
RALUpdater<DropStreamingState
     
     @Override
     public void executeUpdate(final String databaseName, final 
DropStreamingStatement sqlStatement) throws SQLException {
-        jobAPI.stopAndDrop(sqlStatement.getJobId());
+        jobAPI.dropStreaming(sqlStatement.getJobId());
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index ec1bd493e28..15b5f7cc3f5 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -63,6 +63,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.task.progress.IncrementalT
 import 
org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
@@ -328,17 +329,14 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
     }
     
     /**
-     * Stop and drop job.
+     * Drop streaming job.
      *
      * @param jobId job id
      */
-    public void stopAndDrop(final String jobId) {
-        CDCJobConfiguration jobConfig = getJobConfiguration(jobId);
-        if (CDCSinkType.SOCKET == jobConfig.getSinkConfig().getSinkType()) {
-            PipelineJobCenter.stop(jobId);
-        } else {
-            stop(jobId);
-        }
+    public void dropStreaming(final String jobId) {
+        JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
+        CDCJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
+        ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), () 
-> new PipelineInternalException("Can't drop streaming job which is active"));
         dropJob(jobId);
         cleanup(jobConfig);
     }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index 2a3a39d82d5..ad7f7b77410 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -164,7 +164,7 @@ public final class CDCBackendHandler {
      * @param jobId job ID
      */
     public void dropStreaming(final String jobId) {
-        jobAPI.stopAndDrop(jobId);
+        jobAPI.dropStreaming(jobId);
     }
     
     /**
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 70da4a2663a..5d0d7dea417 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -118,7 +118,7 @@ class CDCE2EIT {
                 initSchemaAndTable(containerComposer, connection, 0);
             }
             DialectDatabaseMetaData dialectDatabaseMetaData = new 
DatabaseTypeRegistry(containerComposer.getDatabaseType()).getDialectDatabaseMetaData();
-            startCDCClient(containerComposer, dialectDatabaseMetaData);
+            final CDCClient cdcClient = 
buildCDCClientAndStart(containerComposer, dialectDatabaseMetaData);
             Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW 
STREAMING LIST").isEmpty());
             String jobId = containerComposer.queryForListWithLog("SHOW 
STREAMING LIST").get(0).get("id").toString();
             containerComposer.waitIncrementTaskFinished(String.format("SHOW 
STREAMING STATUS '%s'", jobId));
@@ -140,6 +140,9 @@ class CDCE2EIT {
                     containerComposer.getDatabaseType());
             assertDataMatched(sourceDataSource, targetDataSource, 
orderSchemaTableName);
             assertDataMatched(sourceDataSource, targetDataSource, new 
CaseInsensitiveQualifiedTable(null, "t_address"));
+            cdcClient.close();
+            Awaitility.await().atMost(10L, 
TimeUnit.SECONDS).pollInterval(500L, TimeUnit.MILLISECONDS).until(() -> 
containerComposer.queryForListWithLog("SHOW STREAMING LIST")
+                    .stream().noneMatch(each -> 
Boolean.parseBoolean(each.get("active").toString())));
             containerComposer.proxyExecuteWithLog(String.format("DROP 
STREAMING '%s'", jobId), 0);
             assertTrue(containerComposer.queryForListWithLog("SHOW STREAMING 
LIST").isEmpty());
         }
@@ -166,16 +169,17 @@ class CDCE2EIT {
                 containerComposer.getUsername(), 
containerComposer.getPassword()));
     }
     
-    private void startCDCClient(final PipelineContainerComposer 
containerComposer, final DialectDatabaseMetaData dialectDatabaseMetaData) {
+    private CDCClient buildCDCClientAndStart(final PipelineContainerComposer 
containerComposer, final DialectDatabaseMetaData dialectDatabaseMetaData) {
         DataSource dataSource = createStandardDataSource(containerComposer, 
PipelineContainerComposer.DS_4);
         DataSourceRecordConsumer recordConsumer = new 
DataSourceRecordConsumer(dataSource, containerComposer.getDatabaseType());
         String schema = dialectDatabaseMetaData.isSchemaAvailable() ? "test" : 
"";
-        CDCClient cdcClient = new CDCClient(new 
CDCClientConfiguration("localhost", 
containerComposer.getContainerComposer().getProxyCDCPort(), 5000));
-        cdcClient.connect(recordConsumer, new 
RetryStreamingExceptionHandler(cdcClient, 5, 5000), (ctx, result) -> 
log.error("Server error: {}", result.getErrorMessage()));
-        cdcClient.login(new 
CDCLoginParameter(ProxyContainerConstants.USERNAME, 
ProxyContainerConstants.PASSWORD));
+        CDCClient result = new CDCClient(new 
CDCClientConfiguration("localhost", 
containerComposer.getContainerComposer().getProxyCDCPort(), 5000));
+        result.connect(recordConsumer, new 
RetryStreamingExceptionHandler(result, 5, 5000), (ctx, serverErrorResult) -> 
log.error("Server error: {}", serverErrorResult.getErrorMessage()));
+        result.login(new CDCLoginParameter(ProxyContainerConstants.USERNAME, 
ProxyContainerConstants.PASSWORD));
         // TODO add full=false test case later
-        cdcClient.startStreaming(new StartStreamingParameter("sharding_db", 
new 
HashSet<>(Arrays.asList(SchemaTable.newBuilder().setTable(SOURCE_TABLE_NAME).setSchema(schema).build(),
+        result.startStreaming(new StartStreamingParameter("sharding_db", new 
HashSet<>(Arrays.asList(SchemaTable.newBuilder().setTable(SOURCE_TABLE_NAME).setSchema(schema).build(),
                 SchemaTable.newBuilder().setTable("t_address").build())), 
true));
+        return result;
     }
     
     private List<Map<String, Object>> listOrderRecords(final 
PipelineContainerComposer containerComposer, final String tableNameWithSchema) 
throws SQLException {

Reply via email to