This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 19911e40186 Add more test cases on 
PipelineJobProgressPersistServiceTest (#37121)
19911e40186 is described below

commit 19911e401860f1d0eae3d023579098836216eeb1
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Nov 16 02:01:15 2025 +0800

    Add more test cases on PipelineJobProgressPersistServiceTest (#37121)
    
    * Add more test cases on PipelineJobProgressPersistServiceTest
    
    * Add more test cases on PipelineJobProgressPersistServiceTest
---
 .../PipelineJobProgressPersistServiceTest.java     | 176 ++++++++++++++++++++-
 1 file changed, 168 insertions(+), 8 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistServiceTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistServiceTest.java
index 416f2c583a7..9fe05da62a7 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistServiceTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistServiceTest.java
@@ -18,19 +18,60 @@
 package org.apache.shardingsphere.data.pipeline.core.job.progress.persist;
 
 import lombok.SneakyThrows;
+import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
+import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
+import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobOption;
+import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import 
org.apache.shardingsphere.test.infra.framework.extension.mock.AutoMockExtension;
+import 
org.apache.shardingsphere.test.infra.framework.extension.mock.StaticMockSettings;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.MockedConstruction;
+import org.mockito.MockedStatic;
 import org.mockito.internal.configuration.plugins.Plugins;
 
+import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
 import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ThreadLocalRandom;
 
-import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
+@ExtendWith(AutoMockExtension.class)
+@StaticMockSettings({PipelineJobRegistry.class, ThreadLocalRandom.class})
 class PipelineJobProgressPersistServiceTest {
     
+    @SuppressWarnings("unchecked")
+    @AfterEach
+    void tearDown() {
+        getJobProgressPersistMap().clear();
+        clearInvocations(PipelineJobRegistry.class);
+    }
+    
+    @SuppressWarnings("unchecked")
+    @SneakyThrows(ReflectiveOperationException.class)
+    private Map<String, Map<Integer, PipelineJobProgressPersistContext>> 
getJobProgressPersistMap() {
+        Field field = 
PipelineJobProgressPersistService.class.getDeclaredField("JOB_PROGRESS_PERSIST_MAP");
+        return (Map<String, Map<Integer, PipelineJobProgressPersistContext>>) 
Plugins.getMemberAccessor().get(field, PipelineJobProgressPersistService.class);
+    }
+    
     @Test
     void assertAdd() {
         PipelineJobProgressPersistService.add("foo_id", 1);
@@ -54,16 +95,135 @@ class PipelineJobProgressPersistServiceTest {
         
assertThat(jobProgressPersistMap.get("foo_id").get(1).getUnhandledEventCount().get(),
 is(1L));
     }
     
-    @SuppressWarnings("unchecked")
+    @Test
+    void assertPersistNowSkipsWhenUnhandledCountIsZero() {
+        PipelineJobProgressPersistService.add("foo_id", 1);
+        assertDoesNotThrow(() -> 
PipelineJobProgressPersistService.persistNow("foo_id", 1));
+    }
+    
+    @SuppressWarnings("rawtypes")
     @SneakyThrows(ReflectiveOperationException.class)
-    private Map<String, Map<Integer, PipelineJobProgressPersistContext>> 
getJobProgressPersistMap() {
-        Field field = 
PipelineJobProgressPersistService.class.getDeclaredField("JOB_PROGRESS_PERSIST_MAP");
-        return (Map<String, Map<Integer, PipelineJobProgressPersistContext>>) 
Plugins.getMemberAccessor().get(field, PipelineJobProgressPersistService.class);
+    @Test
+    void assertPersistJobContextRunnableIteratesEntries() {
+        String jobId = "foo_scheduler_job";
+        int shardingItem = 2;
+        PipelineJobItemContext jobItemContext = 
mock(PipelineJobItemContext.class);
+        when(PipelineJobRegistry.getItemContext(jobId, 
shardingItem)).thenReturn(Optional.of(jobItemContext));
+        PipelineJobType<?> jobType = mock(PipelineJobType.class);
+        PipelineJobOption jobOption = mock(PipelineJobOption.class);
+        when(jobOption.getYamlJobItemProgressSwapper()).thenReturn(null);
+        when(jobType.getOption()).thenReturn(jobOption);
+        when(jobType.getType()).thenReturn("TEST");
+        ThreadLocalRandom randomMock = mock(ThreadLocalRandom.class);
+        when(ThreadLocalRandom.current()).thenReturn(randomMock);
+        when(randomMock.nextInt(100)).thenReturn(0);
+        try (
+                MockedStatic<PipelineJobIdUtils> jobIdUtilsMock = 
mockStatic(PipelineJobIdUtils.class);
+                MockedStatic<TypedSPILoader> typedSpiLoaderStatic = 
mockStatic(TypedSPILoader.class);
+                MockedConstruction<PipelineJobItemManager> mockedConstruction 
= mockConstruction(PipelineJobItemManager.class,
+                        (mock, context) -> 
doNothing().when(mock).updateProgress(jobItemContext))) {
+            jobIdUtilsMock.when(() -> 
PipelineJobIdUtils.parseJobType(jobId)).thenReturn(jobType);
+            typedSpiLoaderStatic.when(() -> 
TypedSPILoader.getService(PipelineJobType.class, "TEST")).thenReturn(jobType);
+            PipelineJobProgressPersistService.add(jobId, shardingItem);
+            PipelineJobProgressPersistService.notifyPersist(jobId, 
shardingItem);
+            Class<?> runnableClass = 
Class.forName(PipelineJobProgressPersistService.class.getName() + 
"$PersistJobContextRunnable");
+            Constructor<?> constructor = 
runnableClass.getDeclaredConstructor();
+            constructor.setAccessible(true);
+            Runnable runnable = (Runnable) constructor.newInstance();
+            assertDoesNotThrow(runnable::run);
+            
assertThat(getJobProgressPersistMap().get(jobId).get(shardingItem).getUnhandledEventCount().get(),
 is(0L));
+            
verify(mockedConstruction.constructed().get(0)).updateProgress(jobItemContext);
+        } finally {
+            PipelineJobProgressPersistService.remove(jobId);
+        }
     }
     
+    @SuppressWarnings("rawtypes")
     @Test
-    void assertPersistNow() {
-        PipelineJobProgressPersistService.add("foo_id", 1);
-        assertDoesNotThrow(() -> 
PipelineJobProgressPersistService.persistNow("foo_id", 1));
+    void 
assertPersistNowUpdatesProgressWhenJobItemContextPresentWithoutLogging() {
+        String jobId = "foo_id_success_no_log";
+        int shardingItem = 1;
+        PipelineJobProgressPersistService.add(jobId, shardingItem);
+        PipelineJobProgressPersistService.notifyPersist(jobId, shardingItem);
+        PipelineJobItemContext jobItemContext = 
mock(PipelineJobItemContext.class);
+        when(PipelineJobRegistry.getItemContext(jobId, 
shardingItem)).thenReturn(Optional.of(jobItemContext));
+        PipelineJobType<?> jobType = mock(PipelineJobType.class);
+        PipelineJobOption jobOption = mock(PipelineJobOption.class);
+        when(jobOption.getYamlJobItemProgressSwapper()).thenReturn(null);
+        when(jobType.getOption()).thenReturn(jobOption);
+        when(jobType.getType()).thenReturn("TEST");
+        ThreadLocalRandom randomMock = mock(ThreadLocalRandom.class);
+        when(ThreadLocalRandom.current()).thenReturn(randomMock);
+        when(randomMock.nextInt(100)).thenReturn(0);
+        try (
+                MockedStatic<PipelineJobIdUtils> jobIdUtilsMock = 
mockStatic(PipelineJobIdUtils.class);
+                MockedStatic<TypedSPILoader> typedSpiLoaderStatic = 
mockStatic(TypedSPILoader.class);
+                MockedConstruction<PipelineJobItemManager> mockedConstruction 
= mockConstruction(PipelineJobItemManager.class,
+                        (mock, context) -> 
doNothing().when(mock).updateProgress(jobItemContext))) {
+            jobIdUtilsMock.when(() -> 
PipelineJobIdUtils.parseJobType(jobId)).thenReturn(jobType);
+            typedSpiLoaderStatic.when(() -> 
TypedSPILoader.getService(PipelineJobType.class, "TEST")).thenReturn(jobType);
+            PipelineJobProgressPersistService.persistNow(jobId, shardingItem);
+            
assertThat(getJobProgressPersistMap().get(jobId).get(shardingItem).getUnhandledEventCount().get(),
 is(0L));
+            
verify(mockedConstruction.constructed().get(0)).updateProgress(jobItemContext);
+        }
+    }
+    
+    @SuppressWarnings("rawtypes")
+    @Test
+    void assertPersistNowUpdatesProgressWhenLoggingEnabled() {
+        String jobId = "foo_id_success_log";
+        int shardingItem = 1;
+        PipelineJobProgressPersistService.add(jobId, shardingItem);
+        PipelineJobProgressPersistService.notifyPersist(jobId, shardingItem);
+        PipelineJobItemContext jobItemContext = 
mock(PipelineJobItemContext.class);
+        when(PipelineJobRegistry.getItemContext(jobId, 
shardingItem)).thenReturn(Optional.of(jobItemContext));
+        PipelineJobType<?> jobType = mock(PipelineJobType.class);
+        PipelineJobOption jobOption = mock(PipelineJobOption.class);
+        when(jobOption.getYamlJobItemProgressSwapper()).thenReturn(null);
+        when(jobType.getOption()).thenReturn(jobOption);
+        when(jobType.getType()).thenReturn("TEST");
+        ThreadLocalRandom randomMock = mock(ThreadLocalRandom.class);
+        when(ThreadLocalRandom.current()).thenReturn(randomMock);
+        when(randomMock.nextInt(100)).thenReturn(6);
+        try (
+                MockedStatic<PipelineJobIdUtils> jobIdUtilsMock = 
mockStatic(PipelineJobIdUtils.class);
+                MockedStatic<TypedSPILoader> typedSpiLoaderStatic = 
mockStatic(TypedSPILoader.class);
+                MockedConstruction<PipelineJobItemManager> mockedConstruction 
= mockConstruction(PipelineJobItemManager.class,
+                        (mock, context) -> 
doNothing().when(mock).updateProgress(jobItemContext))) {
+            jobIdUtilsMock.when(() -> 
PipelineJobIdUtils.parseJobType(jobId)).thenReturn(jobType);
+            typedSpiLoaderStatic.when(() -> 
TypedSPILoader.getService(PipelineJobType.class, "TEST")).thenReturn(jobType);
+            PipelineJobProgressPersistService.persistNow(jobId, shardingItem);
+            
assertThat(getJobProgressPersistMap().get(jobId).get(shardingItem).getUnhandledEventCount().get(),
 is(0L));
+            
verify(mockedConstruction.constructed().get(0)).updateProgress(jobItemContext);
+        }
+    }
+    
+    @Test
+    void assertPersistNowSkipsWhenJobItemContextMissing() {
+        String jobId = "foo_id_missing_context";
+        int shardingItem = 1;
+        PipelineJobProgressPersistService.add(jobId, shardingItem);
+        PipelineJobProgressPersistService.notifyPersist(jobId, shardingItem);
+        when(PipelineJobRegistry.getItemContext(jobId, 
shardingItem)).thenReturn(Optional.empty());
+        PipelineJobProgressPersistService.persistNow(jobId, shardingItem);
+        
assertThat(getJobProgressPersistMap().get(jobId).get(shardingItem).getUnhandledEventCount().get(),
 is(1L));
+    }
+    
+    @Test
+    void assertPersistNowHandlesSubsequentExceptionLoggingBranches() {
+        String jobId = "foo_id_exception_follow";
+        int shardingItem = 1;
+        PipelineJobProgressPersistService.add(jobId, shardingItem);
+        PipelineJobProgressPersistContext persistContext = 
getJobProgressPersistMap().get(jobId).get(shardingItem);
+        persistContext.getUnhandledEventCount().set(-1L);
+        when(PipelineJobRegistry.getItemContext(jobId, 
shardingItem)).thenReturn(Optional.empty());
+        ThreadLocalRandom randomMock = mock(ThreadLocalRandom.class);
+        when(ThreadLocalRandom.current()).thenReturn(randomMock);
+        when(randomMock.nextInt(60)).thenReturn(5, 4);
+        assertDoesNotThrow(() -> 
PipelineJobProgressPersistService.persistNow(jobId, shardingItem));
+        assertDoesNotThrow(() -> 
PipelineJobProgressPersistService.persistNow(jobId, shardingItem));
+        assertDoesNotThrow(() -> 
PipelineJobProgressPersistService.persistNow(jobId, shardingItem));
+        verify(randomMock, times(2)).nextInt(60);
+        assertTrue(persistContext.getFirstExceptionLogged().get());
     }
 }

Reply via email to