This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c0c9717f617 Add rollback/commit streaming DistSQL implementation
(#25289)
c0c9717f617 is described below
commit c0c9717f61751a2a49636f47bb8c39ba163b7370
Author: Xinze Guo <[email protected]>
AuthorDate: Wed Apr 26 19:09:01 2023 +0800
Add rollback/commit streaming DistSQL implementation (#25289)
* Add drop streaming DistSQL and implementation
* Add drop streaming test case
* Extraction methods for reuse
* Use awaitility fix occasional errors
* Rename drop to rollback of streaming
* Add commit DistSQL
* Improve comment
* Increment sleep seconds
* Revert "Increment sleep seconds"
This reverts commit ac168023a6961d12d3e37e1ce4ce43eb060bc5a9.
* Fix commit statement not match
* process commit streaming
---
.../ral/pipeline/cdc/UpdatableCDCRALStatement.java | 28 +++--------
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 55 ++++++++++++++++++--
.../data/pipeline/cdc/core/job/CDCJobId.java | 5 +-
.../pipeline/cdc/handler}/CDCBackendHandler.java | 58 ++++++++++------------
.../data/pipeline/cdc/core/job/CDCJobIdTest.java | 3 +-
.../src/main/proto/CDCRequestProtocol.proto | 12 +++--
.../pipeline/core/importer/DataSourceImporter.java | 1 -
.../handler/update/CommitStreamingUpdater.java} | 32 ++++++------
.../handler/update/RollbackStreamingUpdater.java} | 32 ++++++------
...ingsphere.distsql.handler.ral.update.RALUpdater | 2 +
.../distsql/parser/autogen/CDCDistSQLStatement.g4 | 2 +
.../parser/src/main/antlr4/imports/cdc/Keyword.g4 | 8 +++
.../src/main/antlr4/imports/cdc/RALStatement.g4 | 8 +++
.../parser/core/CDCDistSQLStatementVisitor.java | 14 ++++++
.../statement/CommitStreamingStatement.java} | 20 +++++---
.../statement/RollbackStreamingStatement.java} | 20 +++++---
.../frontend/netty/CDCChannelInboundHandler.java | 35 +++++++++----
.../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 2 +
.../general/PostgreSQLMigrationGeneralE2EIT.java | 2 +
.../UpdatablePipelineRALStatementAssert.java | 12 ++++-
.../cdc/CommitStreamingStatementAssert.java | 46 +++++++++++++++++
.../cdc/RollbackStreamingStatementAssert.java | 46 +++++++++++++++++
.../cases/parser/jaxb/RootSQLParserTestCases.java | 8 +++
.../ral/cdc/CommitStreamingStatementTestCase.java | 27 +++++-----
.../cdc/RollbackStreamingStatementTestCase.java | 27 +++++-----
test/it/parser/src/main/resources/case/ral/cdc.xml | 8 +++
.../src/main/resources/sql/supported/ral/cdc.xml | 2 +
27 files changed, 367 insertions(+), 148 deletions(-)
diff --git
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Keyword.g4
b/distsql/statement/src/main/java/org/apache/shardingsphere/distsql/parser/statement/ral/pipeline/cdc/UpdatableCDCRALStatement.java
similarity index 70%
copy from
kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Keyword.g4
copy to
distsql/statement/src/main/java/org/apache/shardingsphere/distsql/parser/statement/ral/pipeline/cdc/UpdatableCDCRALStatement.java
index 4f9a2b84b3a..120cc16e584 100644
--- a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Keyword.g4
+++
b/distsql/statement/src/main/java/org/apache/shardingsphere/distsql/parser/statement/ral/pipeline/cdc/UpdatableCDCRALStatement.java
@@ -15,26 +15,12 @@
* limitations under the License.
*/
-lexer grammar Keyword;
+package org.apache.shardingsphere.distsql.parser.statement.ral.pipeline.cdc;
-import Alphabet;
+import
org.apache.shardingsphere.distsql.parser.statement.ral.pipeline.UpdatablePipelineRALStatement;
-WS
- : [ \t\r\n] + ->skip
- ;
-
-SHOW
- : S H O W
- ;
-
-STREAMING
- : S T R E A M I N G
- ;
-
-LIST
- : L I S T
- ;
-
-STATUS
- : S T A T U S
- ;
+/**
+ * Updatable CDC RAL statement.
+ */
+public abstract class UpdatableCDCRALStatement extends
UpdatablePipelineRALStatement implements CDCRALStatement {
+}
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 2d11a35d2c7..4089bdfbbf3 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -65,14 +65,17 @@ import
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManag
import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import
org.apache.shardingsphere.data.pipeline.core.prepare.PipelineJobPreparerUtils;
import
org.apache.shardingsphere.data.pipeline.core.sharding.ShardingColumnsExtractor;
import
org.apache.shardingsphere.data.pipeline.core.util.JobDataNodeLineConvertUtils;
+import
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
import
org.apache.shardingsphere.infra.datasource.props.DataSourcePropertiesCreator;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
@@ -206,8 +209,7 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
}
private String generateJobId(final PipelineContextKey contextKey, final
YamlCDCJobConfiguration config) {
- // TODO generate parameter add sink type
- CDCJobId jobId = new CDCJobId(contextKey,
config.getSchemaTableNames(), config.isFull());
+ CDCJobId jobId = new CDCJobId(contextKey,
config.getSchemaTableNames(), config.isFull(),
config.getSinkConfig().getSinkType());
return marshalJobId(jobId);
}
@@ -218,6 +220,40 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
return DigestUtils.md5Hex(text.getBytes(StandardCharsets.UTF_8));
}
+ /**
+ * Start job.
+ *
+ * @param jobId job id
+ * @param importerConnector importer connector
+ */
+ public void startJob(final String jobId, final ImporterConnector
importerConnector) {
+ CDCJob job = new CDCJob(importerConnector);
+ PipelineJobCenter.addJob(jobId, job);
+ updateJobConfigurationDisabled(jobId, false);
+ JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
+ OneOffJobBootstrap oneOffJobBootstrap = new
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)),
job, jobConfigPOJO.toJobConfiguration());
+ job.setJobBootstrap(oneOffJobBootstrap);
+ oneOffJobBootstrap.execute();
+ }
+
+ /**
+ * Update job configuration disabled.
+ *
+ * @param jobId job id
+ * @param disabled disabled
+ */
+ public void updateJobConfigurationDisabled(final String jobId, final
boolean disabled) {
+ JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
+ jobConfigPOJO.setDisabled(disabled);
+ if (disabled) {
+ jobConfigPOJO.getProps().setProperty("stop_time_millis",
String.valueOf(System.currentTimeMillis()));
+ } else {
+ jobConfigPOJO.getProps().setProperty("start_time_millis",
String.valueOf(System.currentTimeMillis()));
+ jobConfigPOJO.getProps().remove("stop_time_millis");
+ }
+
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobConfigPOJO.getJobName())).updateJobConfiguration(jobConfigPOJO);
+ }
+
@Override
public CDCTaskConfiguration buildTaskConfiguration(final
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final
PipelineProcessConfiguration pipelineProcessConfig) {
CDCJobConfiguration jobConfig = (CDCJobConfiguration)
pipelineJobConfig;
@@ -298,13 +334,24 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
@Override
public void rollback(final String jobId) throws SQLException {
- stop(jobId);
+ CDCJobConfiguration jobConfig = getJobConfiguration(jobId);
+ if (CDCSinkType.SOCKET == jobConfig.getSinkConfig().getSinkType()) {
+ PipelineJobCenter.stop(jobId);
+ } else {
+ stop(jobId);
+ }
dropJob(jobId);
}
@Override
public void commit(final String jobId) {
- throw new UnsupportedOperationException();
+ CDCJobConfiguration jobConfig = getJobConfiguration(jobId);
+ if (CDCSinkType.SOCKET == jobConfig.getSinkConfig().getSinkType()) {
+ PipelineJobCenter.stop(jobId);
+ } else {
+ stop(jobId);
+ }
+ dropJob(jobId);
}
@Override
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java
index ec62190399a..d05da53fe92 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java
@@ -36,9 +36,12 @@ public final class CDCJobId extends AbstractPipelineJobId {
private final boolean full;
- public CDCJobId(final PipelineContextKey contextKey, final List<String>
schemaTableNames, final boolean full) {
+ private final String sinkType;
+
+ public CDCJobId(final PipelineContextKey contextKey, final List<String>
schemaTableNames, final boolean full, final String sinkType) {
super(new CDCJobType(), contextKey);
this.schemaTableNames = schemaTableNames;
this.full = full;
+ this.sinkType = sinkType;
}
}
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
similarity index 82%
rename from
proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
rename to
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index 84747d34ac0..99766be2e85 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.proxy.backend.handler.cdc;
+package org.apache.shardingsphere.data.pipeline.cdc.handler;
import com.google.common.base.Strings;
import io.netty.channel.Channel;
@@ -28,7 +28,6 @@ import
org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
import
org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckHolder;
import
org.apache.shardingsphere.data.pipeline.cdc.core.connector.SocketSinkImporterConnector;
-import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
import
org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper;
import
org.apache.shardingsphere.data.pipeline.cdc.exception.CDCServerException;
import
org.apache.shardingsphere.data.pipeline.cdc.exception.NotFindStreamDataSourceTableException;
@@ -41,14 +40,9 @@ import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.StreamDataResult;
import org.apache.shardingsphere.data.pipeline.cdc.util.CDCSchemaTableUtils;
import org.apache.shardingsphere.data.pipeline.cdc.util.CDCTableRuleUtils;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
-import
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
-import
org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobConfigurationAPI;
import
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
import org.apache.shardingsphere.infra.datanode.DataNode;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
@@ -75,6 +69,16 @@ public final class CDCBackendHandler {
private final CDCJobAPI jobAPI = new CDCJobAPI();
+ /**
+ * Get database name by job id.
+ *
+ * @param jobId job id
+ * @return database
+ */
+ public String getDatabaseNameByJobId(final String jobId) {
+ return jobAPI.getJobConfiguration(jobId).getDatabaseName();
+ }
+
/**
* Stream data.
*
@@ -115,16 +119,6 @@ public final class CDCBackendHandler {
return
CDCResponseGenerator.succeedBuilder(requestId).setStreamDataResult(StreamDataResult.newBuilder().setStreamingId(jobId).build()).build();
}
- /**
- * Get database name by job id.
- *
- * @param jobId job id
- * @return database
- */
- public String getDatabaseNameByJobId(final String jobId) {
- return jobAPI.getJobConfiguration(jobId).getDatabaseName();
- }
-
/**
* Start streaming.
*
@@ -138,20 +132,11 @@ public final class CDCBackendHandler {
if (PipelineJobCenter.isJobExisting(jobId)) {
PipelineJobCenter.stop(jobId);
}
- JobConfigurationAPI jobConfigAPI =
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId));
- JobConfigurationPOJO jobConfigPOJO =
jobConfigAPI.getJobConfiguration(jobId);
- // TODO, ensure that there is only one consumer at a time, job config
disable may not be updated when the program is forced to close
- jobConfigPOJO.setDisabled(false);
- jobConfigAPI.updateJobConfiguration(jobConfigPOJO);
ShardingSphereDatabase database =
PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(cdcJobConfig.getDatabaseName());
Comparator<DataRecord> dataRecordComparator =
cdcJobConfig.isDecodeWithTX()
?
DataRecordComparatorGenerator.generatorIncrementalComparator(database.getProtocolType())
: null;
- CDCJob job = new CDCJob(new SocketSinkImporterConnector(channel,
database, cdcJobConfig.getJobShardingCount(),
cdcJobConfig.getSchemaTableNames(), dataRecordComparator));
- PipelineJobCenter.addJob(jobId, job);
- OneOffJobBootstrap oneOffJobBootstrap = new
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)),
job, jobConfigPOJO.toJobConfiguration());
- job.setJobBootstrap(oneOffJobBootstrap);
- oneOffJobBootstrap.execute();
+ jobAPI.startJob(jobId, new SocketSinkImporterConnector(channel,
database, cdcJobConfig.getJobShardingCount(),
cdcJobConfig.getSchemaTableNames(), dataRecordComparator));
connectionContext.setJobId(jobId);
}
@@ -166,22 +151,29 @@ public final class CDCBackendHandler {
return;
}
PipelineJobCenter.stop(jobId);
- JobConfigurationAPI jobConfigAPI =
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId));
- JobConfigurationPOJO jobConfig =
jobConfigAPI.getJobConfiguration(jobId);
- jobConfig.setDisabled(true);
- jobConfigAPI.updateJobConfiguration(jobConfig);
+ jobAPI.updateJobConfigurationDisabled(jobId, true);
}
/**
- * Drop streaming.
+ * Rollback streaming.
*
* @param jobId job id.
* @throws SQLException sql exception
*/
- public void dropStreaming(final String jobId) throws SQLException {
+ public void rollbackStreaming(final String jobId) throws SQLException {
jobAPI.rollback(jobId);
}
+ /**
+ * Commit streaming.
+ *
+ * @param jobId job id.
+ * @throws SQLException sql exception
+ */
+ public void commitStreaming(final String jobId) throws SQLException {
+ jobAPI.commit(jobId);
+ }
+
/**
* Process ack.
*
diff --git
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java
index 5255981c8b5..e16e17cdb64 100644
---
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java
+++
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.cdc.core.job;
import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
+import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
@@ -34,7 +35,7 @@ class CDCJobIdTest {
@Test
void parseJobType() {
PipelineContextKey contextKey =
PipelineContextKey.build("sharding_db", InstanceType.PROXY);
- CDCJobId pipelineJobId = new CDCJobId(contextKey,
Arrays.asList("test", "t_order"), false);
+ CDCJobId pipelineJobId = new CDCJobId(contextKey,
Arrays.asList("test", "t_order"), false, CDCSinkType.SOCKET.name());
String jobId =
PipelineJobIdUtils.marshalJobIdCommonPrefix(pipelineJobId) + "abcd";
JobType actualJobType = PipelineJobIdUtils.parseJobType(jobId);
assertThat(actualJobType, instanceOf(CDCJobType.class));
diff --git
a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
index 8e322293cfa..1ec532fa64e 100644
--- a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
+++ b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
@@ -31,7 +31,8 @@ message CDCRequest {
ACK_STREAMING = 3;
STOP_STREAMING = 4;
START_STREAMING = 5;
- DROP_STREAMING = 6;
+ ROLLBACK_STREAMING = 6;
+ COMMIT_STREAMING = 7;
}
Type type = 3;
oneof request_body {
@@ -40,7 +41,8 @@ message CDCRequest {
AckStreamingRequestBody ack_streaming_request_body = 6;
StopStreamingRequestBody stop_streaming_request_body = 7;
StartStreamingRequestBody start_streaming_request_body = 8;
- DropStreamingRequestBody drop_streaming_request_body = 9;
+ RollbackStreamingRequestBody rollback_streaming_request_body = 9;
+ CommitStreamingRequestBody commit_streaming_request_body = 10;
}
}
@@ -82,6 +84,10 @@ message StartStreamingRequestBody {
string streaming_id = 1;
}
-message DropStreamingRequestBody {
+message RollbackStreamingRequestBody {
+ string streaming_id = 1;
+}
+
+message CommitStreamingRequestBody {
string streaming_id = 1;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
index 1ba4b49f1f5..7e94bfc009f 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
@@ -265,7 +265,6 @@ public final class DataSourceImporter extends
AbstractLifecycleExecutor implemen
batchDeleteStatement = preparedStatement;
preparedStatement.setQueryTimeout(30);
for (DataRecord each : dataRecords) {
- conditionColumns = RecordUtils.extractConditionColumns(each,
importerConfig.getShardingColumns(each.getTableName()));
for (int i = 0; i < conditionColumns.size(); i++) {
Object oldValue = conditionColumns.get(i).getOldValue();
if (null == oldValue) {
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/CommitStreamingUpdater.java
similarity index 51%
copy from
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java
copy to
kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/CommitStreamingUpdater.java
index ec62190399a..ec8e7eddcc9 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/CommitStreamingUpdater.java
@@ -15,30 +15,28 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.cdc.core.job;
+package org.apache.shardingsphere.cdc.distsql.handler.update;
-import lombok.Getter;
-import lombok.ToString;
-import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
-import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJobId;
+import
org.apache.shardingsphere.cdc.distsql.statement.CommitStreamingStatement;
+import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
+import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
-import java.util.List;
+import java.sql.SQLException;
/**
- * CDC job id.
+ * Commit streaming updater.
*/
-@Getter
-@ToString(callSuper = true)
-public final class CDCJobId extends AbstractPipelineJobId {
+public final class CommitStreamingUpdater implements
RALUpdater<CommitStreamingStatement> {
- private final List<String> schemaTableNames;
+ private final CDCJobAPI jobAPI = new CDCJobAPI();
- private final boolean full;
+ @Override
+ public void executeUpdate(final String databaseName, final
CommitStreamingStatement sqlStatement) throws SQLException {
+ jobAPI.commit(sqlStatement.getJobId());
+ }
- public CDCJobId(final PipelineContextKey contextKey, final List<String>
schemaTableNames, final boolean full) {
- super(new CDCJobType(), contextKey);
- this.schemaTableNames = schemaTableNames;
- this.full = full;
+ @Override
+ public String getType() {
+ return CommitStreamingStatement.class.getName();
}
}
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/RollbackStreamingUpdater.java
similarity index 51%
copy from
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java
copy to
kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/RollbackStreamingUpdater.java
index ec62190399a..a6a2ef579d4 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/RollbackStreamingUpdater.java
@@ -15,30 +15,28 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.cdc.core.job;
+package org.apache.shardingsphere.cdc.distsql.handler.update;
-import lombok.Getter;
-import lombok.ToString;
-import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
-import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJobId;
+import
org.apache.shardingsphere.cdc.distsql.statement.RollbackStreamingStatement;
+import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
+import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
-import java.util.List;
+import java.sql.SQLException;
/**
- * CDC job id.
+ * Rollback streaming updater.
*/
-@Getter
-@ToString(callSuper = true)
-public final class CDCJobId extends AbstractPipelineJobId {
+public final class RollbackStreamingUpdater implements
RALUpdater<RollbackStreamingStatement> {
- private final List<String> schemaTableNames;
+ private final CDCJobAPI jobAPI = new CDCJobAPI();
- private final boolean full;
+ @Override
+ public void executeUpdate(final String databaseName, final
RollbackStreamingStatement sqlStatement) throws SQLException {
+ jobAPI.rollback(sqlStatement.getJobId());
+ }
- public CDCJobId(final PipelineContextKey contextKey, final List<String>
schemaTableNames, final boolean full) {
- super(new CDCJobType(), contextKey);
- this.schemaTableNames = schemaTableNames;
- this.full = full;
+ @Override
+ public String getType() {
+ return RollbackStreamingStatement.class.getName();
}
}
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater
b/kernel/data-pipeline/distsql/handler/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater
index 0ad61f97de2..63de7ed6cac 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater
+++
b/kernel/data-pipeline/distsql/handler/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater
@@ -26,3 +26,5 @@
org.apache.shardingsphere.migration.distsql.handler.update.CheckMigrationJobUpda
org.apache.shardingsphere.migration.distsql.handler.update.StartMigrationCheckUpdater
org.apache.shardingsphere.migration.distsql.handler.update.StopMigrationCheckUpdater
org.apache.shardingsphere.migration.distsql.handler.update.DropMigrationCheckUpdater
+org.apache.shardingsphere.cdc.distsql.handler.update.RollbackStreamingUpdater
+org.apache.shardingsphere.cdc.distsql.handler.update.CommitStreamingUpdater
diff --git
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
b/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
index a7738218450..4a459b2f38c 100644
---
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
+++
b/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
@@ -22,5 +22,7 @@ import Symbol, RALStatement;
execute
: (showStreamingList
| showStreamingStatus
+ | rollbackStreaming
+ | commitStreaming
) SEMI?
;
diff --git
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Keyword.g4
b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Keyword.g4
index 4f9a2b84b3a..e5b28e62df8 100644
--- a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Keyword.g4
+++ b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Keyword.g4
@@ -38,3 +38,11 @@ LIST
STATUS
: S T A T U S
;
+
+ROLLBACK
+ : R O L L B A C K
+ ;
+
+COMMIT
+ : C O M M I T
+ ;
diff --git
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
index 32d299de91f..5f591718437 100644
---
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
+++
b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
@@ -27,6 +27,14 @@ showStreamingStatus
: SHOW STREAMING STATUS jobId
;
+rollbackStreaming
+ : ROLLBACK STREAMING jobId
+ ;
+
+commitStreaming
+ : COMMIT STREAMING jobId
+ ;
+
jobId
: INT_ | IDENTIFIER_ | STRING_
;
diff --git
a/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java
b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java
index cf845b97fab..31f93c3f279 100644
---
a/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java
+++
b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java
@@ -18,9 +18,13 @@
package org.apache.shardingsphere.cdc.distsql.parser.core;
import org.antlr.v4.runtime.tree.ParseTree;
+import
org.apache.shardingsphere.cdc.distsql.statement.CommitStreamingStatement;
+import
org.apache.shardingsphere.cdc.distsql.statement.RollbackStreamingStatement;
import
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingListStatement;
import
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingStatusStatement;
import
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementBaseVisitor;
+import
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.CommitStreamingContext;
+import
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.RollbackStreamingContext;
import
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingListContext;
import
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingStatusContext;
import org.apache.shardingsphere.sql.parser.api.ASTNode;
@@ -42,6 +46,16 @@ public final class CDCDistSQLStatementVisitor extends
CDCDistSQLStatementBaseVis
return new
ShowStreamingStatusStatement(getIdentifierValue(ctx.jobId()));
}
+ @Override
+ public ASTNode visitRollbackStreaming(final RollbackStreamingContext ctx) {
+ return new RollbackStreamingStatement(getIdentifierValue(ctx.jobId()));
+ }
+
+ @Override
+ public ASTNode visitCommitStreaming(final CommitStreamingContext ctx) {
+ return new CommitStreamingStatement(getIdentifierValue(ctx.jobId()));
+ }
+
private String getIdentifierValue(final ParseTree ctx) {
return null == ctx ? null : new
IdentifierValue(ctx.getText()).getValue();
}
diff --git
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
b/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/CommitStreamingStatement.java
similarity index 66%
copy from
kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
copy to
kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/CommitStreamingStatement.java
index a7738218450..212668c3354 100644
---
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
+++
b/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/CommitStreamingStatement.java
@@ -15,12 +15,18 @@
* limitations under the License.
*/
-grammar CDCDistSQLStatement;
+package org.apache.shardingsphere.cdc.distsql.statement;
-import Symbol, RALStatement;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import
org.apache.shardingsphere.distsql.parser.statement.ral.pipeline.cdc.UpdatableCDCRALStatement;
-execute
- : (showStreamingList
- | showStreamingStatus
- ) SEMI?
- ;
+/**
+ * Commit streaming statement.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class CommitStreamingStatement extends UpdatableCDCRALStatement {
+
+ private final String jobId;
+}
diff --git
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
b/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/RollbackStreamingStatement.java
similarity index 66%
copy from
kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
copy to
kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/RollbackStreamingStatement.java
index a7738218450..2879748b978 100644
---
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
+++
b/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/RollbackStreamingStatement.java
@@ -15,12 +15,18 @@
* limitations under the License.
*/
-grammar CDCDistSQLStatement;
+package org.apache.shardingsphere.cdc.distsql.statement;
-import Symbol, RALStatement;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import
org.apache.shardingsphere.distsql.parser.statement.ral.pipeline.cdc.UpdatableCDCRALStatement;
-execute
- : (showStreamingList
- | showStreamingStatus
- ) SEMI?
- ;
+/**
+ * Rollback streaming statement.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class RollbackStreamingStatement extends UpdatableCDCRALStatement
{
+
+ private final String jobId;
+}
diff --git
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
index 8e9abbbcfbb..54310daea95 100644
---
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
+++
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
@@ -34,8 +34,9 @@ import
org.apache.shardingsphere.data.pipeline.cdc.exception.CDCServerException;
import
org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
-import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.DropStreamingRequestBody;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CommitStreamingRequestBody;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequestBody.BasicBody;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.RollbackStreamingRequestBody;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartStreamingRequestBody;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StopStreamingRequestBody;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
@@ -51,7 +52,7 @@ import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePrecondition
import
org.apache.shardingsphere.infra.util.exception.external.sql.ShardingSphereSQLException;
import
org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
-import org.apache.shardingsphere.proxy.backend.handler.cdc.CDCBackendHandler;
+import org.apache.shardingsphere.data.pipeline.cdc.handler.CDCBackendHandler;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@@ -128,8 +129,11 @@ public final class CDCChannelInboundHandler extends
ChannelInboundHandlerAdapter
case START_STREAMING:
processStartStreamingRequest(ctx, request, connectionContext);
break;
- case DROP_STREAMING:
- processDropStreamingRequest(ctx, request, connectionContext);
+ case ROLLBACK_STREAMING:
+ processRollbackStreamingRequest(ctx, request,
connectionContext);
+ break;
+ case COMMIT_STREAMING:
+ processCommitStreamingRequest(ctx, request, connectionContext);
break;
default:
log.warn("can't handle this type of request {}", request);
@@ -213,18 +217,27 @@ public final class CDCChannelInboundHandler extends
ChannelInboundHandlerAdapter
String database =
backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId());
checkPrivileges(request.getRequestId(),
connectionContext.getCurrentUser().getGrantee(), database);
backendHandler.stopStreaming(connectionContext.getJobId());
- connectionContext.setStatus(CDCConnectionStatus.LOGGED_IN);
connectionContext.setJobId(null);
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
}
- private void processDropStreamingRequest(final ChannelHandlerContext ctx,
final CDCRequest request, final CDCConnectionContext connectionContext) {
- DropStreamingRequestBody requestBody =
request.getDropStreamingRequestBody();
- String database =
backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId());
- checkPrivileges(request.getRequestId(),
connectionContext.getCurrentUser().getGrantee(), database);
+ private void processRollbackStreamingRequest(final ChannelHandlerContext
ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
+ RollbackStreamingRequestBody requestBody =
request.getRollbackStreamingRequestBody();
+ checkPrivileges(request.getRequestId(),
connectionContext.getCurrentUser().getGrantee(),
backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId()));
try {
- backendHandler.dropStreaming(connectionContext.getJobId());
- connectionContext.setStatus(CDCConnectionStatus.LOGGED_IN);
+ backendHandler.rollbackStreaming(connectionContext.getJobId());
+ connectionContext.setJobId(null);
+
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
+ } catch (final SQLException ex) {
+ throw new CDCExceptionWrapper(request.getRequestId(), new
CDCServerException(ex.getMessage()));
+ }
+ }
+
+ private void processCommitStreamingRequest(final ChannelHandlerContext
ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
+ CommitStreamingRequestBody requestBody =
request.getCommitStreamingRequestBody();
+ checkPrivileges(request.getRequestId(),
connectionContext.getCurrentUser().getGrantee(),
backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId()));
+ try {
+ backendHandler.commitStreaming(connectionContext.getJobId());
connectionContext.setJobId(null);
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
} catch (final SQLException ex) {
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 58361dd07c7..e93a4687ce9 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -149,6 +149,8 @@ class CDCE2EIT {
tableMetaData.getColumnNames(), primaryKeyMetaData, null,
progressContext);
DataConsistencyCheckResult checkResult = checker.check(new
DataMatchDataConsistencyCalculateAlgorithm());
assertTrue(checkResult.isMatched());
+ containerComposer.proxyExecuteWithLog(String.format("COMMIT
STREAMING '%s'", jobId), 0);
+ assertTrue(containerComposer.queryForListWithLog("SHOW STREAMING
LIST").isEmpty());
}
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index c2e631a16c0..ed078d92c19 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -103,6 +103,8 @@ class PostgreSQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
startMigrationByJobId(containerComposer, jobId);
// must refresh firstly, otherwise proxy can't get schema and table
info
containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA;", 2);
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).pollInterval(1,
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog(
+ String.format("SELECT * FROM %s WHERE order_id = %s",
String.join(".", PipelineContainerComposer.SCHEMA_NAME, TARGET_TABLE_NAME),
recordId)).isEmpty());
containerComposer.assertProxyOrderRecordExist(String.join(".",
PipelineContainerComposer.SCHEMA_NAME, TARGET_TABLE_NAME), recordId);
assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
}
diff --git
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/UpdatablePipelineRALStatementAssert.java
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/UpdatablePipelineRALStatementAssert.java
index 97dadd8ae7d..7109e877017 100644
---
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/UpdatablePipelineRALStatementAssert.java
+++
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/UpdatablePipelineRALStatementAssert.java
@@ -19,6 +19,8 @@ package
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
+import
org.apache.shardingsphere.cdc.distsql.statement.CommitStreamingStatement;
+import
org.apache.shardingsphere.cdc.distsql.statement.RollbackStreamingStatement;
import
org.apache.shardingsphere.distsql.parser.statement.ral.pipeline.UpdatablePipelineRALStatement;
import
org.apache.shardingsphere.migration.distsql.statement.CheckMigrationStatement;
import
org.apache.shardingsphere.migration.distsql.statement.CommitMigrationStatement;
@@ -31,6 +33,8 @@ import
org.apache.shardingsphere.migration.distsql.statement.StopMigrationCheckS
import
org.apache.shardingsphere.migration.distsql.statement.StopMigrationStatement;
import
org.apache.shardingsphere.migration.distsql.statement.UnregisterMigrationSourceStorageUnitStatement;
import
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.SQLCaseAssertContext;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.cdc.CommitStreamingStatementAssert;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.cdc.RollbackStreamingStatementAssert;
import
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.update.CheckMigrationStatementAssert;
import
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.update.CommitMigrationStatementAssert;
import
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.update.MigrateTableStatementAssert;
@@ -42,6 +46,8 @@ import
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.r
import
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.update.StopMigrationStatementAssert;
import
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.update.UnregisterMigrationSourceStorageUnitStatementAssert;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.SQLParserTestCase;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.CommitStreamingStatementTestCase;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.RollbackStreamingStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.CheckMigrationStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.CommitMigrationStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.MigrateTableStatementTestCase;
@@ -61,7 +67,7 @@ public final class UpdatablePipelineRALStatementAssert {
/**
* Assert updatable pipeline RAL statement is correct with expected parser
result.
- *
+ *
* @param assertContext assert context
* @param actual actual updatable pipeline RAL statement
* @param expected expected updatable pipeline RAL statement test case
@@ -90,6 +96,10 @@ public final class UpdatablePipelineRALStatementAssert {
StartMigrationCheckStatementAssert.assertIs(assertContext,
(StartMigrationCheckStatement) actual, (StartMigrationCheckStatementTestCase)
expected);
} else if (actual instanceof StopMigrationCheckStatement) {
StopMigrationCheckStatementAssert.assertIs(assertContext,
(StopMigrationCheckStatement) actual, (StopMigrationCheckStatementTestCase)
expected);
+ } else if (actual instanceof RollbackStreamingStatement) {
+ RollbackStreamingStatementAssert.assertIs(assertContext,
(RollbackStreamingStatement) actual, (RollbackStreamingStatementTestCase)
expected);
+ } else if (actual instanceof CommitStreamingStatement) {
+ CommitStreamingStatementAssert.assertIs(assertContext,
(CommitStreamingStatement) actual, (CommitStreamingStatementTestCase) expected);
}
}
}
diff --git
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/cdc/CommitStreamingStatementAssert.java
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/cdc/CommitStreamingStatementAssert.java
new file mode 100644
index 00000000000..00d4ee6f594
--- /dev/null
+++
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/cdc/CommitStreamingStatementAssert.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.cdc;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import
org.apache.shardingsphere.cdc.distsql.statement.CommitStreamingStatement;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.SQLCaseAssertContext;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ExistingAssert;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.JobIdAssert;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.CommitStreamingStatementTestCase;
+
+/**
+ * Commit streaming statement assert.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class CommitStreamingStatementAssert {
+
+ /**
+ * Assert commit streaming statement is correct with expected parser
result.
+ *
+ * @param assertContext assert context
+ * @param actual actual
+ * @param expected expected
+ */
+ public static void assertIs(final SQLCaseAssertContext assertContext,
final CommitStreamingStatement actual, final CommitStreamingStatementTestCase
expected) {
+ if (ExistingAssert.assertIs(assertContext, actual, expected)) {
+ JobIdAssert.assertJobId(assertContext, actual.getJobId(),
expected.getJobId());
+ }
+ }
+}
diff --git
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/cdc/RollbackStreamingStatementAssert.java
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/cdc/RollbackStreamingStatementAssert.java
new file mode 100644
index 00000000000..7d964908698
--- /dev/null
+++
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/cdc/RollbackStreamingStatementAssert.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.cdc;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import
org.apache.shardingsphere.cdc.distsql.statement.RollbackStreamingStatement;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.SQLCaseAssertContext;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ExistingAssert;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.JobIdAssert;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.RollbackStreamingStatementTestCase;
+
+/**
+ * Rollback streaming statement assert.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class RollbackStreamingStatementAssert {
+
+ /**
+ * Assert rollback streaming statement is correct with expected parser
result.
+ *
+ * @param assertContext assert context
+ * @param actual actual
+ * @param expected expected
+ */
+ public static void assertIs(final SQLCaseAssertContext assertContext,
final RollbackStreamingStatement actual, final
RollbackStreamingStatementTestCase expected) {
+ if (ExistingAssert.assertIs(assertContext, actual, expected)) {
+ JobIdAssert.assertJobId(assertContext, actual.getJobId(),
expected.getJobId());
+ }
+ }
+}
diff --git
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
index ab25dfa0abc..0fc44915ed5 100644
---
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
+++
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
@@ -321,6 +321,8 @@ import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.s
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.ShowTrafficRulesStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.ShowTransactionRuleStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.UnlabelComputeNodeStatementTestCase;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.CommitStreamingStatementTestCase;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.RollbackStreamingStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.ShowStreamingListStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.ShowStreamingStatusStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.CheckMigrationStatementTestCase;
@@ -1017,6 +1019,12 @@ public final class RootSQLParserTestCases {
@XmlElement(name = "show-streaming-status")
private final List<ShowStreamingStatusStatementTestCase>
showStreamingStatusTestCases = new LinkedList<>();
+ @XmlElement(name = "rollback-streaming")
+ private final List<RollbackStreamingStatementTestCase>
rollbackStreamingTestCases = new LinkedList<>();
+
+ @XmlElement(name = "commit-streaming")
+ private final List<CommitStreamingStatementTestCase>
commitStreamingTestCases = new LinkedList<>();
+
@XmlElement(name = "preview-sql")
private final List<PreviewStatementTestCase> previewTestCases = new
LinkedList<>();
diff --git
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/CommitStreamingStatementTestCase.java
similarity index 61%
copy from
kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
copy to
test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/CommitStreamingStatementTestCase.java
index 32d299de91f..695cfbd2198 100644
---
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
+++
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/CommitStreamingStatementTestCase.java
@@ -15,18 +15,21 @@
* limitations under the License.
*/
-grammar RALStatement;
+package
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc;
-import BaseRule;
+import lombok.Getter;
+import lombok.Setter;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.SQLParserTestCase;
-showStreamingList
- : SHOW STREAMING LIST
- ;
+import javax.xml.bind.annotation.XmlElement;
-showStreamingStatus
- : SHOW STREAMING STATUS jobId
- ;
-
-jobId
- : INT_ | IDENTIFIER_ | STRING_
- ;
+/**
+ * Commit streaming statement test case.
+ */
+@Getter
+@Setter
+public final class CommitStreamingStatementTestCase extends SQLParserTestCase {
+
+ @XmlElement(name = "job-id")
+ private String jobId;
+}
diff --git
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/RollbackStreamingStatementTestCase.java
similarity index 61%
copy from
kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
copy to
test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/RollbackStreamingStatementTestCase.java
index 32d299de91f..b4c20895934 100644
---
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
+++
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/RollbackStreamingStatementTestCase.java
@@ -15,18 +15,21 @@
* limitations under the License.
*/
-grammar RALStatement;
+package
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc;
-import BaseRule;
+import lombok.Getter;
+import lombok.Setter;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.SQLParserTestCase;
-showStreamingList
- : SHOW STREAMING LIST
- ;
+import javax.xml.bind.annotation.XmlElement;
-showStreamingStatus
- : SHOW STREAMING STATUS jobId
- ;
-
-jobId
- : INT_ | IDENTIFIER_ | STRING_
- ;
+/**
+ * Rollback streaming statement test case.
+ */
+@Getter
+@Setter
+public final class RollbackStreamingStatementTestCase extends
SQLParserTestCase {
+
+ @XmlElement(name = "job-id")
+ private String jobId;
+}
diff --git a/test/it/parser/src/main/resources/case/ral/cdc.xml
b/test/it/parser/src/main/resources/case/ral/cdc.xml
index c0b8e7620f5..932f5818411 100644
--- a/test/it/parser/src/main/resources/case/ral/cdc.xml
+++ b/test/it/parser/src/main/resources/case/ral/cdc.xml
@@ -22,4 +22,12 @@
<show-streaming-status sql-case-id="show-streaming-status">
<job-id>123</job-id>
</show-streaming-status>
+
+ <rollback-streaming sql-case-id="rollback-streaming">
+ <job-id>123</job-id>
+ </rollback-streaming>
+
+ <commit-streaming sql-case-id="commit-streaming">
+ <job-id>123</job-id>
+ </commit-streaming>
</sql-parser-test-cases>
diff --git a/test/it/parser/src/main/resources/sql/supported/ral/cdc.xml
b/test/it/parser/src/main/resources/sql/supported/ral/cdc.xml
index 31af22ca298..2190b38e51c 100644
--- a/test/it/parser/src/main/resources/sql/supported/ral/cdc.xml
+++ b/test/it/parser/src/main/resources/sql/supported/ral/cdc.xml
@@ -19,4 +19,6 @@
<sql-cases>
<sql-case id="show-streaming-list" value="SHOW STREAMING LIST;"
db-types="ShardingSphere"/>
<sql-case id="show-streaming-status" value="SHOW STREAMING STATUS 123;"
db-types="ShardingSphere"/>
+ <sql-case id="rollback-streaming" value="ROLLBACK STREAMING 123;"
db-types="ShardingSphere"/>
+ <sql-case id="commit-streaming" value="COMMIT STREAMING 123;"
db-types="ShardingSphere"/>
</sql-cases>