This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new d49595d9f75 Add AlterTransmissionRuleExecutorTest (#37093)
d49595d9f75 is described below
commit d49595d9f757f1fb9238e821250a320914dab2fe
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Nov 14 11:59:56 2025 +0800
Add AlterTransmissionRuleExecutorTest (#37093)
---
AGENTS.md | 3 +
.../update/AlterTransmissionRuleExecutorTest.java | 130 +++++++++++++++++++++
2 files changed, 133 insertions(+)
diff --git a/AGENTS.md b/AGENTS.md
index d469bced1df..c8664c9e18c 100644
--- a/AGENTS.md
+++ b/AGENTS.md
@@ -6,6 +6,7 @@ This guide is written **for AI coding agents only**. Follow it
literally; improv
- `CODE_OF_CONDUCT.md` is the binding “law” for any generated artifact. Review
it once per session and refuse to keep code that conflicts with it (copyright,
inclusivity, licensing, etc.).
- Technical choices must honor ASF expectations: license headers, transparent
intent, explicit rationale in user-facing notes.
- Instruction precedence: `CODE_OF_CONDUCT.md` > user directive > this guide >
other repository documents.
+- Coding style (assertions, formatting, etc.) always follows the standards
referenced by `CODE_OF_CONDUCT.md`; this guide only adds on top of them.
## Team Signals
- **Release tempo:** expect monthly feature trains plus weekly patch windows.
Default to smallest safe change unless explicitly asked for broader refactors.
@@ -67,6 +68,7 @@ Mention which topology you target, the registry used, and any
compatibility cons
## Testing Expectations
- Use JUnit 5 + Mockito; tests mirror production packages, are named
`ClassNameTest`, and assert via `assertXxxCondition`. Keep Arrange–Act–Assert,
adding separators only when clarity demands.
- Mock databases/time/network; instantiate simple POJOs. Reset static
caches/guards between cases if production code retains global state.
+- Keep static mocks minimal—only stub SPI/static calls actually reached by the
scenario to avoid `UnnecessaryStubbingException`.
- Jacoco workflow: `./mvnw -pl {module} -am -Djacoco.skip=false test
jacoco:report`, then inspect `{module}/target/site/jacoco/index.html`.
Aggregator modules require testing concrete submodules before running
`jacoco:report`. When Jacoco fails, describe uncovered branches and the new
tests that cover them.
- Static / constructor mocking: prefer `@ExtendWith(AutoMockExtension.class)`
with `@StaticMockSettings`/`@ConstructionMockSettings`; avoid manual
`mockStatic`/`mockConstruction`. Ensure the module `pom.xml` has the
`shardingsphere-test-infra-framework` test dependency before using these
annotations.
- For coverage gating, run `./mvnw test jacoco:check@jacoco-check
-Pcoverage-check` and report results. If code is truly unreachable, cite
file/line and explain why, noting whether cleanup is recommended.
@@ -151,6 +153,7 @@ When a task requires tests, automatically:
- When only a few properties of a complex object are used, mock it rather than
assembling the full graph.
- Do not mock simple objects that can be instantiated directly with `new`.
- Do not enable Mockito’s `RETURNS_DEEP_STUBS` unless unavoidable chained
interactions make explicit stubs impractical; if you must enable it, mention
the justification in the test description.
+- If a production class hides dependencies behind private finals (no
constructor/setter), you may instantiate it directly and use reflection to
inject mocks, but document why SPI creation is bypassed and still mock the SPI
interactions the code triggers.
## SPI Loader Usage
- Cache services obtained through SPI loaders (`OrderedSPILoader`,
`TypedSPILoader`, `DatabaseTypedSPILoader`, etc.) at the test-class or suite
level whenever the same type is reused, so repeated lookups do not slow tests
or introduce ordering surprises.
diff --git
a/kernel/data-pipeline/distsql/handler/src/test/java/org/apache/shardingsphere/data/pipeline/distsql/handler/transmission/update/AlterTransmissionRuleExecutorTest.java
b/kernel/data-pipeline/distsql/handler/src/test/java/org/apache/shardingsphere/data/pipeline/distsql/handler/transmission/update/AlterTransmissionRuleExecutorTest.java
new file mode 100644
index 00000000000..1653f2c66ee
--- /dev/null
+++
b/kernel/data-pipeline/distsql/handler/src/test/java/org/apache/shardingsphere/data/pipeline/distsql/handler/transmission/update/AlterTransmissionRuleExecutorTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.data.pipeline.distsql.handler.transmission.update;
+
+import
org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelCreator;
+import
org.apache.shardingsphere.data.pipeline.core.channel.memory.MemoryPipelineChannelCreator;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
+import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
+import
org.apache.shardingsphere.data.pipeline.distsql.statement.updatable.AlterTransmissionRuleStatement;
+import
org.apache.shardingsphere.distsql.handler.engine.update.DistSQLUpdateExecutor;
+import org.apache.shardingsphere.distsql.segment.AlgorithmSegment;
+import org.apache.shardingsphere.distsql.segment.ReadOrWriteSegment;
+import org.apache.shardingsphere.distsql.segment.TransmissionRuleSegment;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.infra.util.props.PropertiesBuilder;
+import org.apache.shardingsphere.infra.util.props.PropertiesBuilder.Property;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.internal.configuration.plugins.Plugins;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class AlterTransmissionRuleExecutorTest {
+
+ private static final String JOB_TYPE = "MIGRATION";
+
+ private final AlterTransmissionRuleExecutor executor =
(AlterTransmissionRuleExecutor)
TypedSPILoader.getService(DistSQLUpdateExecutor.class,
AlterTransmissionRuleStatement.class);
+
+ @Mock
+ private PipelineProcessConfigurationPersistService persistService;
+
+ @BeforeEach
+ void setUp() throws ReflectiveOperationException {
+
Plugins.getMemberAccessor().set(AlterTransmissionRuleExecutor.class.getDeclaredField("processConfigPersistService"),
executor, persistService);
+ }
+
+ @Test
+ void assertExecuteUpdate() {
+ PipelineJobType jobType = mock(PipelineJobType.class);
+ when(jobType.getType()).thenReturn(JOB_TYPE);
+ TransmissionRuleSegment segment = new TransmissionRuleSegment();
+ segment.setReadSegment(new ReadOrWriteSegment(5, 1000, 200, new
AlgorithmSegment("READ_LIMITER", PropertiesBuilder.build(new Property("qps",
"50")))));
+ segment.setWriteSegment(new ReadOrWriteSegment(3, 500, new
AlgorithmSegment("WRITE_LIMITER", PropertiesBuilder.build(new Property("qps",
"20")))));
+ segment.setStreamChannel(new AlgorithmSegment("MEMORY",
PropertiesBuilder.build(new Property("block-queue-size", "1024"))));
+ AlterTransmissionRuleStatement sqlStatement = new
AlterTransmissionRuleStatement(JOB_TYPE, segment);
+ try (MockedStatic<TypedSPILoader> mockedStatic =
mockStatic(TypedSPILoader.class)) {
+ mockedStatic.when(() ->
TypedSPILoader.getService(PipelineJobType.class, JOB_TYPE)).thenReturn(jobType);
+ mockedStatic.when(() ->
TypedSPILoader.findService(PipelineChannelCreator.class,
"MEMORY")).thenReturn(Optional.of(new MemoryPipelineChannelCreator()));
+ executor.executeUpdate(sqlStatement, mock());
+ ArgumentCaptor<PipelineProcessConfiguration> configCaptor =
ArgumentCaptor.forClass(PipelineProcessConfiguration.class);
+ verify(persistService).persist(any(PipelineContextKey.class),
eq(JOB_TYPE), configCaptor.capture());
+ PipelineProcessConfiguration actual = configCaptor.getValue();
+ assertThat(actual.getRead().getWorkerThread(), is(5));
+ assertThat(actual.getRead().getBatchSize(), is(1000));
+ assertThat(actual.getRead().getShardingSize(), is(200));
+ assertThat(actual.getRead().getRateLimiter().getType(),
is("READ_LIMITER"));
+
assertThat(actual.getRead().getRateLimiter().getProps().getProperty("qps"),
is("50"));
+ assertThat(actual.getWrite().getWorkerThread(), is(3));
+ assertThat(actual.getWrite().getBatchSize(), is(500));
+ assertThat(actual.getWrite().getRateLimiter().getType(),
is("WRITE_LIMITER"));
+
assertThat(actual.getWrite().getRateLimiter().getProps().getProperty("qps"),
is("20"));
+ assertThat(actual.getStreamChannel().getType(), is("MEMORY"));
+
assertThat(actual.getStreamChannel().getProps().getProperty("block-queue-size"),
is("1024"));
+ }
+ }
+
+ @Test
+ void assertExecuteUpdatePersistWhenStreamChannelIsNull() {
+ PipelineJobType jobType = mock(PipelineJobType.class);
+ when(jobType.getType()).thenReturn(JOB_TYPE);
+ AlterTransmissionRuleStatement sqlStatement = new
AlterTransmissionRuleStatement(JOB_TYPE, new TransmissionRuleSegment());
+ try (MockedStatic<TypedSPILoader> mockedStatic =
mockStatic(TypedSPILoader.class)) {
+ mockedStatic.when(() ->
TypedSPILoader.getService(PipelineJobType.class, JOB_TYPE)).thenReturn(jobType);
+ executor.executeUpdate(sqlStatement, mock());
+ ArgumentCaptor<PipelineProcessConfiguration> configCaptor =
ArgumentCaptor.forClass(PipelineProcessConfiguration.class);
+ verify(persistService).persist(any(PipelineContextKey.class),
eq(JOB_TYPE), configCaptor.capture());
+ PipelineProcessConfiguration actual = configCaptor.getValue();
+ assertNull(actual.getRead());
+ assertNull(actual.getWrite());
+ assertNull(actual.getStreamChannel());
+ }
+ }
+
+ @Test
+ void assertExecuteUpdateThrowWhenStreamChannelTypeIsUnknown() {
+ TransmissionRuleSegment segment = new TransmissionRuleSegment();
+ segment.setStreamChannel(new AlgorithmSegment("UNKNOWN", new
Properties()));
+ AlterTransmissionRuleStatement sqlStatement = new
AlterTransmissionRuleStatement(JOB_TYPE, segment);
+ try (MockedStatic<TypedSPILoader> mockedStatic =
mockStatic(TypedSPILoader.class)) {
+ mockedStatic.when(() ->
TypedSPILoader.findService(PipelineChannelCreator.class,
"UNKNOWN")).thenReturn(Optional.empty());
+ assertThrows(PipelineInvalidParameterException.class, () ->
executor.executeUpdate(sqlStatement, mock()));
+ }
+ }
+}