This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a05bdd872dc Improve MemoryPipelineChannelTest for nightly CI (#29677)
a05bdd872dc is described below
commit a05bdd872dc4092330e1861d3d9ebe37980cec3f
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Mon Jan 8 19:34:58 2024 +0800
Improve MemoryPipelineChannelTest for nightly CI (#29677)
---
.../core/channel/memory/MemoryPipelineChannelTest.java | 13 ++++++++++---
1 file changed, 10 insertions(+), 3 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
index 7899b852977..812250a5727 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
@@ -26,10 +26,13 @@ import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
class MemoryPipelineChannelTest {
@@ -38,10 +41,14 @@ class MemoryPipelineChannelTest {
void assertZeroQueueSizeWorks() {
MemoryPipelineChannel channel = new MemoryPipelineChannel(0, new
InventoryTaskAckCallback(new AtomicReference<>()));
List<Record> records = Collections.singletonList(new
PlaceholderRecord(new IngestFinishedPosition()));
- Thread thread = new Thread(() -> channel.push(records));
+ Semaphore semaphore = new Semaphore(1);
+ Thread thread = new Thread(() -> {
+ semaphore.release();
+ channel.push(records);
+ });
thread.start();
- assertThat(channel.fetch(1, 5L), is(records));
- thread.join();
+ assertTrue(semaphore.tryAcquire(1L, TimeUnit.SECONDS));
+ assertThat(channel.fetch(1, 50L), is(records));
}
@Test