This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 8f552ade946 Add alter streaming rule DistSQL and implementation
(#28898)
8f552ade946 is described below
commit 8f552ade946f80d97dfa98abfe08f5ad064bf90a
Author: Xinze Guo <[email protected]>
AuthorDate: Wed Nov 15 15:36:58 2023 +0800
Add alter streaming rule DistSQL and implementation (#28898)
* Add alter streaming rule DistSQL and implementation
* Update E2E
* Add visitWriteDefinition
---
.../distsql/parser/autogen/CDCDistSQLStatement.g4 | 1 +
.../parser/src/main/antlr4/imports/cdc/BaseRule.g4 | 36 ++++++++++
.../parser/src/main/antlr4/imports/cdc/Keyword.g4 | 48 +++++++++++++
.../src/main/antlr4/imports/cdc/RALStatement.g4 | 36 ++++++++++
.../parser/core/CDCDistSQLStatementVisitor.java | 84 ++++++++++++++++++++++
.../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 2 +
6 files changed, 207 insertions(+)
diff --git
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
b/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
index 0395586766a..d45e8e86cf5 100644
---
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
+++
b/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
@@ -23,5 +23,6 @@ execute
: (showStreamingList
| showStreamingStatus
| dropStreaming
+ | alterStreamingRule
) SEMI_? EOF
;
diff --git
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/BaseRule.g4
b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/BaseRule.g4
index 42a04424348..b7285f768da 100644
---
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/BaseRule.g4
+++
b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/BaseRule.g4
@@ -18,3 +18,39 @@
grammar BaseRule;
import Symbol, Keyword, Literals;
+
+algorithmDefinition
+ : TYPE LP_ NAME EQ_ algorithmTypeName (COMMA_ propertiesDefinition)? RP_
+ ;
+
+algorithmTypeName
+ : STRING_
+ ;
+
+propertiesDefinition
+ : PROPERTIES LP_ properties? RP_
+ ;
+
+properties
+ : property (COMMA_ property)*
+ ;
+
+property
+ : key=STRING_ EQ_ value=literal
+ ;
+
+literal
+ : STRING_ | (MINUS_)? INT_ | TRUE | FALSE
+ ;
+
+TRUE
+ : T R U E
+ ;
+
+FALSE
+ : F A L S E
+ ;
+
+intValue
+ : INT_
+ ;
diff --git
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Keyword.g4
b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Keyword.g4
index 9eb03da7e4d..68bd46e9b4a 100644
--- a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Keyword.g4
+++ b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Keyword.g4
@@ -42,3 +42,51 @@ STATUS
DROP
: D R O P
;
+
+TYPE
+ : T Y P E
+ ;
+
+NAME
+ : N A M E
+ ;
+
+PROPERTIES
+ : P R O P E R T I E S
+ ;
+
+ALTER
+ : A L T E R
+ ;
+
+RULE
+ : R U L E
+ ;
+
+READ
+ : R E A D
+ ;
+
+WORKER_THREAD
+ : W O R K E R UL_ T H R E A D
+ ;
+
+BATCH_SIZE
+ : B A T C H UL_ S I Z E
+ ;
+
+SHARDING_SIZE
+ : S H A R D I N G UL_ S I Z E
+ ;
+
+RATE_LIMITER
+ : R A T E UL_ L I M I T E R
+ ;
+
+STREAM_CHANNEL
+ : S T R E A M UL_ C H A N N E L
+ ;
+
+WRITE
+ : W R I T E
+ ;
diff --git
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
index 03ca7573ace..39a1a28c3b6 100644
---
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
+++
b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
@@ -19,6 +19,42 @@ grammar RALStatement;
import BaseRule;
+alterStreamingRule
+ : ALTER STREAMING RULE inventoryIncrementalRule?
+ ;
+
+inventoryIncrementalRule
+ : LP_ readDefinition? (COMMA_? streamChannel)? RP_
+ ;
+
+readDefinition
+ : READ LP_ workerThread? (COMMA_? batchSize)? (COMMA_? shardingSize)?
(COMMA_? rateLimiter)? RP_
+ ;
+
+workerThread
+ : WORKER_THREAD EQ_ intValue
+ ;
+
+batchSize
+ : BATCH_SIZE EQ_ intValue
+ ;
+
+shardingSize
+ : SHARDING_SIZE EQ_ intValue
+ ;
+
+rateLimiter
+ : RATE_LIMITER LP_ algorithmDefinition RP_
+ ;
+
+writeDefinition
+ : WRITE LP_ workerThread? (COMMA_? batchSize)? (COMMA_? rateLimiter)? RP_
+ ;
+
+streamChannel
+ : STREAM_CHANNEL LP_ algorithmDefinition RP_
+ ;
+
showStreamingList
: SHOW STREAMING LIST
;
diff --git
a/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java
b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java
index 37db49fd40c..c79d9db596e 100644
---
a/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java
+++
b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java
@@ -22,13 +22,31 @@ import
org.apache.shardingsphere.cdc.distsql.statement.DropStreamingStatement;
import
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingListStatement;
import
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingStatusStatement;
import
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementBaseVisitor;
+import
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.AlgorithmDefinitionContext;
+import
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.AlterStreamingRuleContext;
+import
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.BatchSizeContext;
import
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.DropStreamingContext;
+import
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.InventoryIncrementalRuleContext;
+import
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.PropertiesDefinitionContext;
+import
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.PropertyContext;
+import
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.RateLimiterContext;
+import
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ReadDefinitionContext;
+import
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShardingSizeContext;
import
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingListContext;
import
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingStatusContext;
+import
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.StreamChannelContext;
+import
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.WorkerThreadContext;
+import
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.WriteDefinitionContext;
+import org.apache.shardingsphere.distsql.segment.AlgorithmSegment;
+import
org.apache.shardingsphere.distsql.segment.InventoryIncrementalRuleSegment;
+import org.apache.shardingsphere.distsql.segment.ReadOrWriteSegment;
+import
org.apache.shardingsphere.distsql.statement.ral.updatable.AlterInventoryIncrementalRuleStatement;
import org.apache.shardingsphere.sql.parser.api.ASTNode;
import org.apache.shardingsphere.sql.parser.api.visitor.SQLVisitor;
import
org.apache.shardingsphere.sql.parser.sql.common.value.identifier.IdentifierValue;
+import java.util.Properties;
+
/**
* SQL statement visitor for CDC DistSQL.
*/
@@ -52,4 +70,70 @@ public final class CDCDistSQLStatementVisitor extends
CDCDistSQLStatementBaseVis
private String getIdentifierValue(final ParseTree ctx) {
return null == ctx ? null : new
IdentifierValue(ctx.getText()).getValue();
}
+
+ @Override
+ public ASTNode visitAlterStreamingRule(final AlterStreamingRuleContext
ctx) {
+ InventoryIncrementalRuleSegment segment = null ==
ctx.inventoryIncrementalRule() ? null
+ : (InventoryIncrementalRuleSegment)
visit(ctx.inventoryIncrementalRule());
+ return new AlterInventoryIncrementalRuleStatement("STREAMING",
segment);
+ }
+
+ @Override
+ public ASTNode visitInventoryIncrementalRule(final
InventoryIncrementalRuleContext ctx) {
+ InventoryIncrementalRuleSegment result = new
InventoryIncrementalRuleSegment();
+ if (null != ctx.readDefinition()) {
+ result.setReadSegment((ReadOrWriteSegment)
visit(ctx.readDefinition()));
+ }
+ if (null != ctx.streamChannel()) {
+ result.setStreamChannel((AlgorithmSegment)
visit(ctx.streamChannel()));
+ }
+ return result;
+ }
+
+ @Override
+ public ASTNode visitReadDefinition(final ReadDefinitionContext ctx) {
+ return new ReadOrWriteSegment(getWorkerThread(ctx.workerThread()),
getBatchSize(ctx.batchSize()), getShardingSize(ctx.shardingSize()),
getAlgorithmSegment(ctx.rateLimiter()));
+ }
+
+ private Integer getWorkerThread(final WorkerThreadContext ctx) {
+ return null == ctx ? null : Integer.parseInt(ctx.intValue().getText());
+ }
+
+ private Integer getBatchSize(final BatchSizeContext ctx) {
+ return null == ctx ? null : Integer.parseInt(ctx.intValue().getText());
+ }
+
+ private Integer getShardingSize(final ShardingSizeContext ctx) {
+ return null == ctx ? null : Integer.parseInt(ctx.intValue().getText());
+ }
+
+ private AlgorithmSegment getAlgorithmSegment(final RateLimiterContext ctx)
{
+ return null == ctx ? null : (AlgorithmSegment) visit(ctx);
+ }
+
+ @Override
+ public ASTNode visitWriteDefinition(final WriteDefinitionContext ctx) {
+ return new ReadOrWriteSegment(getWorkerThread(ctx.workerThread()),
getBatchSize(ctx.batchSize()), getAlgorithmSegment(ctx.rateLimiter()));
+ }
+
+ @Override
+ public ASTNode visitStreamChannel(final StreamChannelContext ctx) {
+ return visit(ctx.algorithmDefinition());
+ }
+
+ @Override
+ public ASTNode visitAlgorithmDefinition(final AlgorithmDefinitionContext
ctx) {
+ return new
AlgorithmSegment(getIdentifierValue(ctx.algorithmTypeName()),
buildProperties(ctx.propertiesDefinition()));
+ }
+
+ private Properties buildProperties(final PropertiesDefinitionContext ctx) {
+ Properties result = new Properties();
+ if (null == ctx) {
+ return result;
+ }
+ for (PropertyContext each : ctx.properties().property()) {
+
result.setProperty(IdentifierValue.getQuotedContent(each.key.getText()),
IdentifierValue.getQuotedContent(each.value.getText()));
+ }
+ return result;
+ }
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 86f7bc9b658..e5b0a12d08d 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -98,6 +98,8 @@ class CDCE2EIT {
void assertCDCDataImportSuccess(final PipelineTestParameter testParam)
throws SQLException, InterruptedException {
TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam, new CDCJobType())) {
+ containerComposer.proxyExecuteWithLog("ALTER STREAMING RULE
(READ(WORKER_THREAD=64,BATCH_SIZE=1000,SHARDING_SIZE=10000000,RATE_LIMITER
(TYPE(NAME='QPS',PROPERTIES('qps'='10000')))),"
+ + "STREAM_CHANNEL
(TYPE(NAME='MEMORY',PROPERTIES('block-queue-size'='2000'))));", 0);
for (String each : Arrays.asList(PipelineContainerComposer.DS_0,
PipelineContainerComposer.DS_1)) {
containerComposer.registerStorageUnit(each);
}