This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new d49595d9f75 Add AlterTransmissionRuleExecutorTest (#37093)
d49595d9f75 is described below

commit d49595d9f757f1fb9238e821250a320914dab2fe
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Nov 14 11:59:56 2025 +0800

    Add AlterTransmissionRuleExecutorTest (#37093)
---
 AGENTS.md                                          |   3 +
 .../update/AlterTransmissionRuleExecutorTest.java  | 130 +++++++++++++++++++++
 2 files changed, 133 insertions(+)

diff --git a/AGENTS.md b/AGENTS.md
index d469bced1df..c8664c9e18c 100644
--- a/AGENTS.md
+++ b/AGENTS.md
@@ -6,6 +6,7 @@ This guide is written **for AI coding agents only**. Follow it 
literally; improv
 - `CODE_OF_CONDUCT.md` is the binding “law” for any generated artifact. Review 
it once per session and refuse to keep code that conflicts with it (copyright, 
inclusivity, licensing, etc.).
 - Technical choices must honor ASF expectations: license headers, transparent 
intent, explicit rationale in user-facing notes.
 - Instruction precedence: `CODE_OF_CONDUCT.md` > user directive > this guide > 
other repository documents.
+- Coding style (assertions, formatting, etc.) always follows the standards 
referenced by `CODE_OF_CONDUCT.md`; this guide only adds on top of them.
 
 ## Team Signals
 - **Release tempo:** expect monthly feature trains plus weekly patch windows. 
Default to smallest safe change unless explicitly asked for broader refactors.
@@ -67,6 +68,7 @@ Mention which topology you target, the registry used, and any 
compatibility cons
 ## Testing Expectations
 - Use JUnit 5 + Mockito; tests mirror production packages, are named 
`ClassNameTest`, and assert via `assertXxxCondition`. Keep Arrange–Act–Assert, 
adding separators only when clarity demands.
 - Mock databases/time/network; instantiate simple POJOs. Reset static 
caches/guards between cases if production code retains global state.
+- Keep static mocks minimal—only stub SPI/static calls actually reached by the 
scenario to avoid `UnnecessaryStubbingException`.
 - Jacoco workflow: `./mvnw -pl {module} -am -Djacoco.skip=false test 
jacoco:report`, then inspect `{module}/target/site/jacoco/index.html`. 
Aggregator modules require testing concrete submodules before running 
`jacoco:report`. When Jacoco fails, describe uncovered branches and the new 
tests that cover them.
 - Static / constructor mocking: prefer `@ExtendWith(AutoMockExtension.class)` 
with `@StaticMockSettings`/`@ConstructionMockSettings`; avoid manual 
`mockStatic`/`mockConstruction`. Ensure the module `pom.xml` has the 
`shardingsphere-test-infra-framework` test dependency before using these 
annotations.
 - For coverage gating, run `./mvnw test jacoco:check@jacoco-check 
-Pcoverage-check` and report results. If code is truly unreachable, cite 
file/line and explain why, noting whether cleanup is recommended.
@@ -151,6 +153,7 @@ When a task requires tests, automatically:
 - When only a few properties of a complex object are used, mock it rather than 
assembling the full graph.
 - Do not mock simple objects that can be instantiated directly with `new`.
 - Do not enable Mockito’s `RETURNS_DEEP_STUBS` unless unavoidable chained 
interactions make explicit stubs impractical; if you must enable it, mention 
the justification in the test description.
+- If a production class hides dependencies behind private finals (no 
constructor/setter), you may instantiate it directly and use reflection to 
inject mocks, but document why SPI creation is bypassed and still mock the SPI 
interactions the code triggers.
 
 ## SPI Loader Usage
 - Cache services obtained through SPI loaders (`OrderedSPILoader`, 
`TypedSPILoader`, `DatabaseTypedSPILoader`, etc.) at the test-class or suite 
level whenever the same type is reused, so repeated lookups do not slow tests 
or introduce ordering surprises.
diff --git 
a/kernel/data-pipeline/distsql/handler/src/test/java/org/apache/shardingsphere/data/pipeline/distsql/handler/transmission/update/AlterTransmissionRuleExecutorTest.java
 
b/kernel/data-pipeline/distsql/handler/src/test/java/org/apache/shardingsphere/data/pipeline/distsql/handler/transmission/update/AlterTransmissionRuleExecutorTest.java
new file mode 100644
index 00000000000..1653f2c66ee
--- /dev/null
+++ 
b/kernel/data-pipeline/distsql/handler/src/test/java/org/apache/shardingsphere/data/pipeline/distsql/handler/transmission/update/AlterTransmissionRuleExecutorTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.data.pipeline.distsql.handler.transmission.update;
+
+import 
org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelCreator;
+import 
org.apache.shardingsphere.data.pipeline.core.channel.memory.MemoryPipelineChannelCreator;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
+import 
org.apache.shardingsphere.data.pipeline.distsql.statement.updatable.AlterTransmissionRuleStatement;
+import 
org.apache.shardingsphere.distsql.handler.engine.update.DistSQLUpdateExecutor;
+import org.apache.shardingsphere.distsql.segment.AlgorithmSegment;
+import org.apache.shardingsphere.distsql.segment.ReadOrWriteSegment;
+import org.apache.shardingsphere.distsql.segment.TransmissionRuleSegment;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.infra.util.props.PropertiesBuilder;
+import org.apache.shardingsphere.infra.util.props.PropertiesBuilder.Property;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.internal.configuration.plugins.Plugins;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class AlterTransmissionRuleExecutorTest {
+    
+    private static final String JOB_TYPE = "MIGRATION";
+    
+    private final AlterTransmissionRuleExecutor executor = 
(AlterTransmissionRuleExecutor) 
TypedSPILoader.getService(DistSQLUpdateExecutor.class, 
AlterTransmissionRuleStatement.class);
+    
+    @Mock
+    private PipelineProcessConfigurationPersistService persistService;
+    
+    @BeforeEach
+    void setUp() throws ReflectiveOperationException {
+        
Plugins.getMemberAccessor().set(AlterTransmissionRuleExecutor.class.getDeclaredField("processConfigPersistService"),
 executor, persistService);
+    }
+    
+    @Test
+    void assertExecuteUpdate() {
+        PipelineJobType jobType = mock(PipelineJobType.class);
+        when(jobType.getType()).thenReturn(JOB_TYPE);
+        TransmissionRuleSegment segment = new TransmissionRuleSegment();
+        segment.setReadSegment(new ReadOrWriteSegment(5, 1000, 200, new 
AlgorithmSegment("READ_LIMITER", PropertiesBuilder.build(new Property("qps", 
"50")))));
+        segment.setWriteSegment(new ReadOrWriteSegment(3, 500, new 
AlgorithmSegment("WRITE_LIMITER", PropertiesBuilder.build(new Property("qps", 
"20")))));
+        segment.setStreamChannel(new AlgorithmSegment("MEMORY", 
PropertiesBuilder.build(new Property("block-queue-size", "1024"))));
+        AlterTransmissionRuleStatement sqlStatement = new 
AlterTransmissionRuleStatement(JOB_TYPE, segment);
+        try (MockedStatic<TypedSPILoader> mockedStatic = 
mockStatic(TypedSPILoader.class)) {
+            mockedStatic.when(() -> 
TypedSPILoader.getService(PipelineJobType.class, JOB_TYPE)).thenReturn(jobType);
+            mockedStatic.when(() -> 
TypedSPILoader.findService(PipelineChannelCreator.class, 
"MEMORY")).thenReturn(Optional.of(new MemoryPipelineChannelCreator()));
+            executor.executeUpdate(sqlStatement, mock());
+            ArgumentCaptor<PipelineProcessConfiguration> configCaptor = 
ArgumentCaptor.forClass(PipelineProcessConfiguration.class);
+            verify(persistService).persist(any(PipelineContextKey.class), 
eq(JOB_TYPE), configCaptor.capture());
+            PipelineProcessConfiguration actual = configCaptor.getValue();
+            assertThat(actual.getRead().getWorkerThread(), is(5));
+            assertThat(actual.getRead().getBatchSize(), is(1000));
+            assertThat(actual.getRead().getShardingSize(), is(200));
+            assertThat(actual.getRead().getRateLimiter().getType(), 
is("READ_LIMITER"));
+            
assertThat(actual.getRead().getRateLimiter().getProps().getProperty("qps"), 
is("50"));
+            assertThat(actual.getWrite().getWorkerThread(), is(3));
+            assertThat(actual.getWrite().getBatchSize(), is(500));
+            assertThat(actual.getWrite().getRateLimiter().getType(), 
is("WRITE_LIMITER"));
+            
assertThat(actual.getWrite().getRateLimiter().getProps().getProperty("qps"), 
is("20"));
+            assertThat(actual.getStreamChannel().getType(), is("MEMORY"));
+            
assertThat(actual.getStreamChannel().getProps().getProperty("block-queue-size"),
 is("1024"));
+        }
+    }
+    
+    @Test
+    void assertExecuteUpdatePersistWhenStreamChannelIsNull() {
+        PipelineJobType jobType = mock(PipelineJobType.class);
+        when(jobType.getType()).thenReturn(JOB_TYPE);
+        AlterTransmissionRuleStatement sqlStatement = new 
AlterTransmissionRuleStatement(JOB_TYPE, new TransmissionRuleSegment());
+        try (MockedStatic<TypedSPILoader> mockedStatic = 
mockStatic(TypedSPILoader.class)) {
+            mockedStatic.when(() -> 
TypedSPILoader.getService(PipelineJobType.class, JOB_TYPE)).thenReturn(jobType);
+            executor.executeUpdate(sqlStatement, mock());
+            ArgumentCaptor<PipelineProcessConfiguration> configCaptor = 
ArgumentCaptor.forClass(PipelineProcessConfiguration.class);
+            verify(persistService).persist(any(PipelineContextKey.class), 
eq(JOB_TYPE), configCaptor.capture());
+            PipelineProcessConfiguration actual = configCaptor.getValue();
+            assertNull(actual.getRead());
+            assertNull(actual.getWrite());
+            assertNull(actual.getStreamChannel());
+        }
+    }
+    
+    @Test
+    void assertExecuteUpdateThrowWhenStreamChannelTypeIsUnknown() {
+        TransmissionRuleSegment segment = new TransmissionRuleSegment();
+        segment.setStreamChannel(new AlgorithmSegment("UNKNOWN", new 
Properties()));
+        AlterTransmissionRuleStatement sqlStatement = new 
AlterTransmissionRuleStatement(JOB_TYPE, segment);
+        try (MockedStatic<TypedSPILoader> mockedStatic = 
mockStatic(TypedSPILoader.class)) {
+            mockedStatic.when(() -> 
TypedSPILoader.findService(PipelineChannelCreator.class, 
"UNKNOWN")).thenReturn(Optional.empty());
+            assertThrows(PipelineInvalidParameterException.class, () -> 
executor.executeUpdate(sqlStatement, mock()));
+        }
+    }
+}

Reply via email to