This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a05bdd872dc Improve MemoryPipelineChannelTest for nightly CI (#29677)
a05bdd872dc is described below

commit a05bdd872dc4092330e1861d3d9ebe37980cec3f
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Mon Jan 8 19:34:58 2024 +0800

    Improve MemoryPipelineChannelTest for nightly CI (#29677)
---
 .../core/channel/memory/MemoryPipelineChannelTest.java      | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
index 7899b852977..812250a5727 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
@@ -26,10 +26,13 @@ import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class MemoryPipelineChannelTest {
     
@@ -38,10 +41,14 @@ class MemoryPipelineChannelTest {
     void assertZeroQueueSizeWorks() {
         MemoryPipelineChannel channel = new MemoryPipelineChannel(0, new 
InventoryTaskAckCallback(new AtomicReference<>()));
         List<Record> records = Collections.singletonList(new 
PlaceholderRecord(new IngestFinishedPosition()));
-        Thread thread = new Thread(() -> channel.push(records));
+        Semaphore semaphore = new Semaphore(1);
+        Thread thread = new Thread(() -> {
+            semaphore.release();
+            channel.push(records);
+        });
         thread.start();
-        assertThat(channel.fetch(1, 5L), is(records));
-        thread.join();
+        assertTrue(semaphore.tryAcquire(1L, TimeUnit.SECONDS));
+        assertThat(channel.fetch(1, 50L), is(records));
     }
     
     @Test

Reply via email to