This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f552ade946 Add alter streaming rule DistSQL and implementation 
(#28898)
8f552ade946 is described below

commit 8f552ade946f80d97dfa98abfe08f5ad064bf90a
Author: Xinze Guo <[email protected]>
AuthorDate: Wed Nov 15 15:36:58 2023 +0800

    Add alter streaming rule DistSQL and implementation (#28898)
    
    * Add alter streaming rule DistSQL and implementation
    
    * Update E2E
    
    * Add visitWriteDefinition
---
 .../distsql/parser/autogen/CDCDistSQLStatement.g4  |  1 +
 .../parser/src/main/antlr4/imports/cdc/BaseRule.g4 | 36 ++++++++++
 .../parser/src/main/antlr4/imports/cdc/Keyword.g4  | 48 +++++++++++++
 .../src/main/antlr4/imports/cdc/RALStatement.g4    | 36 ++++++++++
 .../parser/core/CDCDistSQLStatementVisitor.java    | 84 ++++++++++++++++++++++
 .../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java |  2 +
 6 files changed, 207 insertions(+)

diff --git 
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
 
b/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
index 0395586766a..d45e8e86cf5 100644
--- 
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
+++ 
b/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
@@ -23,5 +23,6 @@ execute
     : (showStreamingList
     | showStreamingStatus
     | dropStreaming
+    | alterStreamingRule
     ) SEMI_? EOF
     ;
diff --git 
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/BaseRule.g4 
b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/BaseRule.g4
index 42a04424348..b7285f768da 100644
--- 
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/BaseRule.g4
+++ 
b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/BaseRule.g4
@@ -18,3 +18,39 @@
 grammar BaseRule;
 
 import Symbol, Keyword, Literals;
+
+algorithmDefinition
+    : TYPE LP_ NAME EQ_ algorithmTypeName (COMMA_ propertiesDefinition)? RP_
+    ;
+
+algorithmTypeName
+    : STRING_
+    ;
+
+propertiesDefinition
+    : PROPERTIES LP_ properties? RP_
+    ;
+
+properties
+    : property (COMMA_ property)*
+    ;
+
+property
+    : key=STRING_ EQ_ value=literal
+    ;
+
+literal
+    : STRING_ | (MINUS_)? INT_ | TRUE | FALSE
+    ;
+
+TRUE
+    : T R U E
+    ;
+
+FALSE
+    : F A L S E
+    ;
+
+intValue
+    : INT_
+    ;
diff --git 
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Keyword.g4 
b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Keyword.g4
index 9eb03da7e4d..68bd46e9b4a 100644
--- a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Keyword.g4
+++ b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/Keyword.g4
@@ -42,3 +42,51 @@ STATUS
 DROP
     : D R O P
     ;
+
+TYPE
+    : T Y P E
+    ;
+
+NAME
+    : N A M E
+    ;
+
+PROPERTIES
+    : P R O P E R T I E S
+    ;
+
+ALTER
+    : A L T E R
+    ;
+
+RULE
+    :  R U L E
+    ;
+
+READ
+    : R E A D
+    ;
+
+WORKER_THREAD
+    : W O R K E R UL_ T H R E A D
+    ;
+
+BATCH_SIZE
+    : B A T C H UL_ S I Z E
+    ;
+
+SHARDING_SIZE
+    : S H A R D I N G UL_ S I Z E
+    ;
+
+RATE_LIMITER
+    : R A T E UL_ L I M I T E R
+    ;
+
+STREAM_CHANNEL
+    : S T R E A M UL_ C H A N N E L
+    ;
+
+WRITE
+    : W R I T E
+    ;
diff --git 
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
 
b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
index 03ca7573ace..39a1a28c3b6 100644
--- 
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
+++ 
b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
@@ -19,6 +19,42 @@ grammar RALStatement;
 
 import BaseRule;
 
+alterStreamingRule
+    : ALTER STREAMING RULE inventoryIncrementalRule?
+    ;
+
+inventoryIncrementalRule
+    : LP_ readDefinition? (COMMA_? streamChannel)? RP_
+    ;
+
+readDefinition
+    : READ LP_ workerThread? (COMMA_? batchSize)? (COMMA_? shardingSize)? 
(COMMA_? rateLimiter)? RP_
+    ;
+
+workerThread
+    : WORKER_THREAD EQ_ intValue
+    ;
+
+batchSize
+    : BATCH_SIZE EQ_ intValue
+    ;
+
+shardingSize
+    : SHARDING_SIZE EQ_ intValue
+    ;
+
+rateLimiter
+    : RATE_LIMITER LP_ algorithmDefinition RP_
+    ;
+
+writeDefinition
+    : WRITE LP_ workerThread? (COMMA_? batchSize)? (COMMA_? rateLimiter)? RP_
+    ;
+
+streamChannel
+    : STREAM_CHANNEL LP_ algorithmDefinition RP_
+    ;
+
 showStreamingList
     : SHOW STREAMING LIST
     ;
diff --git 
a/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java
 
b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java
index 37db49fd40c..c79d9db596e 100644
--- 
a/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java
+++ 
b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java
@@ -22,13 +22,31 @@ import 
org.apache.shardingsphere.cdc.distsql.statement.DropStreamingStatement;
 import 
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingListStatement;
 import 
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingStatusStatement;
 import 
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementBaseVisitor;
+import 
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.AlgorithmDefinitionContext;
+import 
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.AlterStreamingRuleContext;
+import 
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.BatchSizeContext;
 import 
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.DropStreamingContext;
+import 
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.InventoryIncrementalRuleContext;
+import 
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.PropertiesDefinitionContext;
+import 
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.PropertyContext;
+import 
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.RateLimiterContext;
+import 
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ReadDefinitionContext;
+import 
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShardingSizeContext;
 import 
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingListContext;
 import 
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingStatusContext;
+import 
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.StreamChannelContext;
+import 
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.WorkerThreadContext;
+import 
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.WriteDefinitionContext;
+import org.apache.shardingsphere.distsql.segment.AlgorithmSegment;
+import 
org.apache.shardingsphere.distsql.segment.InventoryIncrementalRuleSegment;
+import org.apache.shardingsphere.distsql.segment.ReadOrWriteSegment;
+import 
org.apache.shardingsphere.distsql.statement.ral.updatable.AlterInventoryIncrementalRuleStatement;
 import org.apache.shardingsphere.sql.parser.api.ASTNode;
 import org.apache.shardingsphere.sql.parser.api.visitor.SQLVisitor;
 import 
org.apache.shardingsphere.sql.parser.sql.common.value.identifier.IdentifierValue;
 
+import java.util.Properties;
+
 /**
  * SQL statement visitor for CDC DistSQL.
  */
@@ -52,4 +70,70 @@ public final class CDCDistSQLStatementVisitor extends 
CDCDistSQLStatementBaseVis
     private String getIdentifierValue(final ParseTree ctx) {
         return null == ctx ? null : new 
IdentifierValue(ctx.getText()).getValue();
     }
+    
+    @Override
+    public ASTNode visitAlterStreamingRule(final AlterStreamingRuleContext 
ctx) {
+        InventoryIncrementalRuleSegment segment = null == 
ctx.inventoryIncrementalRule() ? null
+                : (InventoryIncrementalRuleSegment) 
visit(ctx.inventoryIncrementalRule());
+        return new AlterInventoryIncrementalRuleStatement("STREAMING", 
segment);
+    }
+    
+    @Override
+    public ASTNode visitInventoryIncrementalRule(final 
InventoryIncrementalRuleContext ctx) {
+        InventoryIncrementalRuleSegment result = new 
InventoryIncrementalRuleSegment();
+        if (null != ctx.readDefinition()) {
+            result.setReadSegment((ReadOrWriteSegment) 
visit(ctx.readDefinition()));
+        }
+        if (null != ctx.streamChannel()) {
+            result.setStreamChannel((AlgorithmSegment) 
visit(ctx.streamChannel()));
+        }
+        return result;
+    }
+    
+    @Override
+    public ASTNode visitReadDefinition(final ReadDefinitionContext ctx) {
+        return new ReadOrWriteSegment(getWorkerThread(ctx.workerThread()), 
getBatchSize(ctx.batchSize()), getShardingSize(ctx.shardingSize()), 
getAlgorithmSegment(ctx.rateLimiter()));
+    }
+    
+    private Integer getWorkerThread(final WorkerThreadContext ctx) {
+        return null == ctx ? null : Integer.parseInt(ctx.intValue().getText());
+    }
+    
+    private Integer getBatchSize(final BatchSizeContext ctx) {
+        return null == ctx ? null : Integer.parseInt(ctx.intValue().getText());
+    }
+    
+    private Integer getShardingSize(final ShardingSizeContext ctx) {
+        return null == ctx ? null : Integer.parseInt(ctx.intValue().getText());
+    }
+    
+    private AlgorithmSegment getAlgorithmSegment(final RateLimiterContext ctx) 
{
+        return null == ctx ? null : (AlgorithmSegment) visit(ctx);
+    }
+    
+    @Override
+    public ASTNode visitWriteDefinition(final WriteDefinitionContext ctx) {
+        return new ReadOrWriteSegment(getWorkerThread(ctx.workerThread()), 
getBatchSize(ctx.batchSize()), getAlgorithmSegment(ctx.rateLimiter()));
+    }
+    
+    @Override
+    public ASTNode visitStreamChannel(final StreamChannelContext ctx) {
+        return visit(ctx.algorithmDefinition());
+    }
+    
+    @Override
+    public ASTNode visitAlgorithmDefinition(final AlgorithmDefinitionContext 
ctx) {
+        return new 
AlgorithmSegment(getIdentifierValue(ctx.algorithmTypeName()), 
buildProperties(ctx.propertiesDefinition()));
+    }
+    
+    private Properties buildProperties(final PropertiesDefinitionContext ctx) {
+        Properties result = new Properties();
+        if (null == ctx) {
+            return result;
+        }
+        for (PropertyContext each : ctx.properties().property()) {
+            
result.setProperty(IdentifierValue.getQuotedContent(each.key.getText()), 
IdentifierValue.getQuotedContent(each.value.getText()));
+        }
+        return result;
+    }
 }
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 86f7bc9b658..e5b0a12d08d 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -98,6 +98,8 @@ class CDCE2EIT {
     void assertCDCDataImportSuccess(final PipelineTestParameter testParam) 
throws SQLException, InterruptedException {
         TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
         try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam, new CDCJobType())) {
+            containerComposer.proxyExecuteWithLog("ALTER STREAMING RULE 
(READ(WORKER_THREAD=64,BATCH_SIZE=1000,SHARDING_SIZE=10000000,RATE_LIMITER 
(TYPE(NAME='QPS',PROPERTIES('qps'='10000')))),"
+                    + "STREAM_CHANNEL 
(TYPE(NAME='MEMORY',PROPERTIES('block-queue-size'='2000'))));", 0);
             for (String each : Arrays.asList(PipelineContainerComposer.DS_0, 
PipelineContainerComposer.DS_1)) {
                 containerComposer.registerStorageUnit(each);
             }

Reply via email to