This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 7308e0c9b4f Improve drop streaming, not allow drop active job (#28992)
7308e0c9b4f is described below
commit 7308e0c9b4f4f4f1a260a4fe7d37cc43582c8f62
Author: Xinze Guo <[email protected]>
AuthorDate: Thu Nov 9 18:58:34 2023 +0800
Improve drop streaming, not allow drop active job (#28992)
* Improve drop streaming, not allow drop active job
* Improve CDCE2EIT
---
.../cdc/distsql/handler/update/DropStreamingUpdater.java | 2 +-
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 14 ++++++--------
.../data/pipeline/cdc/handler/CDCBackendHandler.java | 2 +-
.../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 16 ++++++++++------
4 files changed, 18 insertions(+), 16 deletions(-)
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/DropStreamingUpdater.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/DropStreamingUpdater.java
index 24e8d61da8c..47db9466af5 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/DropStreamingUpdater.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/DropStreamingUpdater.java
@@ -32,7 +32,7 @@ public final class DropStreamingUpdater implements
RALUpdater<DropStreamingState
@Override
public void executeUpdate(final String databaseName, final
DropStreamingStatement sqlStatement) throws SQLException {
- jobAPI.stopAndDrop(sqlStatement.getJobId());
+ jobAPI.dropStreaming(sqlStatement.getJobId());
}
@Override
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index ec1bd493e28..15b5f7cc3f5 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -63,6 +63,7 @@ import
org.apache.shardingsphere.data.pipeline.common.task.progress.IncrementalT
import
org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
+import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
@@ -328,17 +329,14 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
}
/**
- * Stop and drop job.
+ * Drop streaming job.
*
* @param jobId job id
*/
- public void stopAndDrop(final String jobId) {
- CDCJobConfiguration jobConfig = getJobConfiguration(jobId);
- if (CDCSinkType.SOCKET == jobConfig.getSinkConfig().getSinkType()) {
- PipelineJobCenter.stop(jobId);
- } else {
- stop(jobId);
- }
+ public void dropStreaming(final String jobId) {
+ JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
+ CDCJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
+ ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), ()
-> new PipelineInternalException("Can't drop streaming job which is active"));
dropJob(jobId);
cleanup(jobConfig);
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index 2a3a39d82d5..ad7f7b77410 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -164,7 +164,7 @@ public final class CDCBackendHandler {
* @param jobId job ID
*/
public void dropStreaming(final String jobId) {
- jobAPI.stopAndDrop(jobId);
+ jobAPI.dropStreaming(jobId);
}
/**
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 70da4a2663a..5d0d7dea417 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -118,7 +118,7 @@ class CDCE2EIT {
initSchemaAndTable(containerComposer, connection, 0);
}
DialectDatabaseMetaData dialectDatabaseMetaData = new
DatabaseTypeRegistry(containerComposer.getDatabaseType()).getDialectDatabaseMetaData();
- startCDCClient(containerComposer, dialectDatabaseMetaData);
+ final CDCClient cdcClient =
buildCDCClientAndStart(containerComposer, dialectDatabaseMetaData);
Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L,
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW
STREAMING LIST").isEmpty());
String jobId = containerComposer.queryForListWithLog("SHOW
STREAMING LIST").get(0).get("id").toString();
containerComposer.waitIncrementTaskFinished(String.format("SHOW
STREAMING STATUS '%s'", jobId));
@@ -140,6 +140,9 @@ class CDCE2EIT {
containerComposer.getDatabaseType());
assertDataMatched(sourceDataSource, targetDataSource,
orderSchemaTableName);
assertDataMatched(sourceDataSource, targetDataSource, new
CaseInsensitiveQualifiedTable(null, "t_address"));
+ cdcClient.close();
+ Awaitility.await().atMost(10L,
TimeUnit.SECONDS).pollInterval(500L, TimeUnit.MILLISECONDS).until(() ->
containerComposer.queryForListWithLog("SHOW STREAMING LIST")
+ .stream().noneMatch(each ->
Boolean.parseBoolean(each.get("active").toString())));
containerComposer.proxyExecuteWithLog(String.format("DROP
STREAMING '%s'", jobId), 0);
assertTrue(containerComposer.queryForListWithLog("SHOW STREAMING
LIST").isEmpty());
}
@@ -166,16 +169,17 @@ class CDCE2EIT {
containerComposer.getUsername(),
containerComposer.getPassword()));
}
- private void startCDCClient(final PipelineContainerComposer
containerComposer, final DialectDatabaseMetaData dialectDatabaseMetaData) {
+ private CDCClient buildCDCClientAndStart(final PipelineContainerComposer
containerComposer, final DialectDatabaseMetaData dialectDatabaseMetaData) {
DataSource dataSource = createStandardDataSource(containerComposer,
PipelineContainerComposer.DS_4);
DataSourceRecordConsumer recordConsumer = new
DataSourceRecordConsumer(dataSource, containerComposer.getDatabaseType());
String schema = dialectDatabaseMetaData.isSchemaAvailable() ? "test" :
"";
- CDCClient cdcClient = new CDCClient(new
CDCClientConfiguration("localhost",
containerComposer.getContainerComposer().getProxyCDCPort(), 5000));
- cdcClient.connect(recordConsumer, new
RetryStreamingExceptionHandler(cdcClient, 5, 5000), (ctx, result) ->
log.error("Server error: {}", result.getErrorMessage()));
- cdcClient.login(new
CDCLoginParameter(ProxyContainerConstants.USERNAME,
ProxyContainerConstants.PASSWORD));
+ CDCClient result = new CDCClient(new
CDCClientConfiguration("localhost",
containerComposer.getContainerComposer().getProxyCDCPort(), 5000));
+ result.connect(recordConsumer, new
RetryStreamingExceptionHandler(result, 5, 5000), (ctx, serverErrorResult) ->
log.error("Server error: {}", serverErrorResult.getErrorMessage()));
+ result.login(new CDCLoginParameter(ProxyContainerConstants.USERNAME,
ProxyContainerConstants.PASSWORD));
// TODO add full=false test case later
- cdcClient.startStreaming(new StartStreamingParameter("sharding_db",
new
HashSet<>(Arrays.asList(SchemaTable.newBuilder().setTable(SOURCE_TABLE_NAME).setSchema(schema).build(),
+ result.startStreaming(new StartStreamingParameter("sharding_db", new
HashSet<>(Arrays.asList(SchemaTable.newBuilder().setTable(SOURCE_TABLE_NAME).setSchema(schema).build(),
SchemaTable.newBuilder().setTable("t_address").build())),
true));
+ return result;
}
private List<Map<String, Object>> listOrderRecords(final
PipelineContainerComposer containerComposer, final String tableNameWithSchema)
throws SQLException {