This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 1aacef7 [DistSQL] Make workerThread, batchSize and rateLimiter
optional in sharding scaling rule. (#15456)
1aacef7 is described below
commit 1aacef7ff91d5ebc84481a69a158ca2418352b88
Author: Raigor <[email protected]>
AuthorDate: Thu Feb 17 09:56:29 2022 +0800
[DistSQL] Make workerThread, batchSize and rateLimiter optional in sharding
scaling rule. (#15456)
---
.../syntax/rdl/rule-definition/sharding.cn.md | 4 +--
.../syntax/rdl/rule-definition/sharding.en.md | 4 +--
.../CreateShardingScalingRuleStatementUpdater.java | 18 +++++++-----
.../main/antlr4/imports/scaling/RDLStatement.g4 | 4 +--
.../parser/core/ScalingSQLStatementVisitor.java | 34 ++++++++++++++++++----
.../statement/segment/InputOrOutputSegment.java | 4 +--
6 files changed, 46 insertions(+), 22 deletions(-)
diff --git
a/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rdl/rule-definition/sharding.cn.md
b/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rdl/rule-definition/sharding.cn.md
index 7ab94b4..03b306f 100644
---
a/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rdl/rule-definition/sharding.cn.md
+++
b/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rdl/rule-definition/sharding.cn.md
@@ -154,10 +154,10 @@ scalingRuleDefinition:
[inputDefinition] [, outputDefinition] [, streamChannel] [,
completionDetector] [, dataConsistencyChecker]
inputDefinition:
- INPUT (workerThread, batchSize, rateLimiter)
+ INPUT ([workerThread] [, batchSize] [, rateLimiter])
outputDefinition:
- INPUT (workerThread, batchSize, rateLimiter)
+ OUTPUT ([workerThread] [, batchSize] [, rateLimiter])
completionDetector:
COMPLETION_DETECTOR (algorithmDefinition)
diff --git
a/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rdl/rule-definition/sharding.en.md
b/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rdl/rule-definition/sharding.en.md
index 7e20f93..e7e586b 100644
---
a/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rdl/rule-definition/sharding.en.md
+++
b/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rdl/rule-definition/sharding.en.md
@@ -154,10 +154,10 @@ scalingRuleDefinition:
[inputDefinition] [, outputDefinition] [, streamChannel] [,
completionDetector] [, dataConsistencyChecker]
inputDefinition:
- INPUT (workerThread, batchSize, rateLimiter)
+ INPUT ([workerThread] [, batchSize] [, rateLimiter])
outputDefinition:
- INPUT (workerThread, batchSize, rateLimiter)
+ OUTPUT ([workerThread] [, batchSize] [, rateLimiter])
completionDetector:
COMPLETION_DETECTOR (algorithmDefinition)
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/CreateShardingScalingRuleStatementUpdater.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/CreateShardingScalingRuleStatementUpdater.java
index 20dc278..4f9e0b0 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/CreateShardingScalingRuleStatementUpdater.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/CreateShardingScalingRuleStatementUpdater.java
@@ -61,7 +61,7 @@ public final class CreateShardingScalingRuleStatementUpdater
implements RuleDefi
String schemaName = shardingSphereMetaData.getName();
checkCurrentRuleConfiguration(schemaName, currentRuleConfig);
checkDuplicate(schemaName, sqlStatement, currentRuleConfig);
- checkAlgorithms(sqlStatement);
+ checkAlgorithms(sqlStatement.getConfigurationSegment());
}
private void checkCurrentRuleConfiguration(final String schemaName, final
ShardingRuleConfiguration currentRuleConfig) throws RequiredRuleMissedException
{
@@ -76,14 +76,14 @@ public final class
CreateShardingScalingRuleStatementUpdater implements RuleDefi
}
}
- private void checkAlgorithms(final CreateShardingScalingRuleStatement
sqlStatement) throws DistSQLException {
- if (null == sqlStatement.getConfigurationSegment()) {
+ private void checkAlgorithms(final ShardingScalingRuleConfigurationSegment
segment) throws DistSQLException {
+ if (null == segment) {
return;
}
- checkRateLimiterExist(sqlStatement.getConfigurationSegment());
- checkStreamChannelExist(sqlStatement.getConfigurationSegment());
- checkCompletionDetectorExist(sqlStatement.getConfigurationSegment());
-
checkDataConsistencyCheckerExist(sqlStatement.getConfigurationSegment());
+ checkRateLimiterExist(segment);
+ checkStreamChannelExist(segment);
+ checkCompletionDetectorExist(segment);
+ checkDataConsistencyCheckerExist(segment);
}
private void checkRateLimiterExist(final
ShardingScalingRuleConfigurationSegment segment) throws DistSQLException {
@@ -96,7 +96,9 @@ public final class CreateShardingScalingRuleStatementUpdater
implements RuleDefi
}
private void checkRateLimiterAlgorithm(final AlgorithmSegment rateLimiter)
throws DistSQLException {
- checkAlgorithm(RATE_LIMIT_ALGORITHM_HOLDER, "rate limiter",
rateLimiter);
+ if (null != rateLimiter) {
+ checkAlgorithm(RATE_LIMIT_ALGORITHM_HOLDER, "rate limiter",
rateLimiter);
+ }
}
private void checkStreamChannelExist(final
ShardingScalingRuleConfigurationSegment segment) throws DistSQLException {
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/imports/scaling/RDLStatement.g4
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/imports/scaling/RDLStatement.g4
index b022a6d..6ed1bf8 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/imports/scaling/RDLStatement.g4
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/imports/scaling/RDLStatement.g4
@@ -44,11 +44,11 @@ scalingRuleDefinition
;
inputDefinition
- : INPUT LP workerThread COMMA batchSize COMMA rateLimiter RP
+ : INPUT LP workerThread? (COMMA? batchSize)? (COMMA? rateLimiter)? RP
;
outputDefinition
- : OUTPUT LP workerThread COMMA batchSize COMMA rateLimiter RP
+ : OUTPUT LP workerThread? (COMMA? batchSize)? (COMMA? rateLimiter)? RP
;
completionDetector
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/java/org/apache/shardingsphere/scaling/distsql/parser/core/ScalingSQLStatementVisitor.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/java/org/apache/shardingsphere/scaling/distsql/parser/core/ScalingSQLStatementVisitor.java
index 0f804cb..bef5785 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/java/org/apache/shardingsphere/scaling/distsql/parser/core/ScalingSQLStatementVisitor.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/java/org/apache/shardingsphere/scaling/distsql/parser/core/ScalingSQLStatementVisitor.java
@@ -21,6 +21,7 @@ import
org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementBaseVisi
import org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementParser;
import
org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementParser.AlgorithmDefinitionContext;
import
org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementParser.ApplyScalingContext;
+import
org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementParser.BatchSizeContext;
import
org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementParser.CheckScalingContext;
import
org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementParser.CompletionDetectorContext;
import
org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementParser.CreateShardingScalingRuleContext;
@@ -43,6 +44,7 @@ import
org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementParser.S
import
org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementParser.StopScalingContext;
import
org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementParser.StopScalingSourceWritingContext;
import
org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementParser.StreamChannelContext;
+import
org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementParser.WorkerThreadContext;
import org.apache.shardingsphere.distsql.parser.segment.AlgorithmSegment;
import
org.apache.shardingsphere.scaling.distsql.statement.ApplyScalingStatement;
import
org.apache.shardingsphere.scaling.distsql.statement.CheckScalingStatement;
@@ -159,20 +161,40 @@ public final class ScalingSQLStatementVisitor extends
ScalingStatementBaseVisito
@Override
public ASTNode visitInputDefinition(final InputDefinitionContext ctx) {
- int workerThread =
Integer.parseInt(ctx.workerThread().intValue().getText());
- int batchSize = Integer.parseInt(ctx.batchSize().intValue().getText());
- AlgorithmSegment rateLimiter = (AlgorithmSegment)
visit(ctx.rateLimiter());
+ Integer workerThread = getWorkerThread(ctx.workerThread());
+ Integer batchSize = getBatchSize(ctx.batchSize());
+ AlgorithmSegment rateLimiter = null;
+ if (null != ctx.rateLimiter()) {
+ rateLimiter = (AlgorithmSegment) visit(ctx.rateLimiter());
+ }
return new InputOrOutputSegment(workerThread, batchSize, rateLimiter);
}
@Override
public ASTNode visitOutputDefinition(final OutputDefinitionContext ctx) {
- int workerThread =
Integer.parseInt(ctx.workerThread().intValue().getText());
- int batchSize = Integer.parseInt(ctx.batchSize().intValue().getText());
- AlgorithmSegment rateLimiter = (AlgorithmSegment)
visit(ctx.rateLimiter());
+ Integer workerThread = getWorkerThread(ctx.workerThread());
+ Integer batchSize = getBatchSize(ctx.batchSize());
+ AlgorithmSegment rateLimiter = null;
+ if (null != ctx.rateLimiter()) {
+ rateLimiter = (AlgorithmSegment) visit(ctx.rateLimiter());
+ }
return new InputOrOutputSegment(workerThread, batchSize, rateLimiter);
}
+ private Integer getWorkerThread(final WorkerThreadContext ctx) {
+ if (null == ctx) {
+ return null;
+ }
+ return Integer.parseInt(ctx.intValue().getText());
+ }
+
+ private Integer getBatchSize(final BatchSizeContext ctx) {
+ if (null == ctx) {
+ return null;
+ }
+ return Integer.parseInt(ctx.intValue().getText());
+ }
+
@Override
public ASTNode visitRateLimiter(final RateLimiterContext ctx) {
return visit(ctx.algorithmDefinition());
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/scaling/distsql/statement/segment/InputOrOutputSegment.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/scaling/distsql/statement/segment/InputOrOutputSegment.java
index 35d5c92..67c3d4b 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/scaling/distsql/statement/segment/InputOrOutputSegment.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/scaling/distsql/statement/segment/InputOrOutputSegment.java
@@ -29,9 +29,9 @@ import
org.apache.shardingsphere.sql.parser.api.visitor.ASTNode;
@Getter
public final class InputOrOutputSegment implements ASTNode {
- private final int workerThread;
+ private final Integer workerThread;
- private final int batchSize;
+ private final Integer batchSize;
private final AlgorithmSegment rateLimiter;
}