This is an automated email from the ASF dual-hosted git repository.

jianglongtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new d164a4cfbeb Remove 
SimpleMemoryPipelineChannelTest.assertFetchRecordsTimeoutCorrectly (#29651)
d164a4cfbeb is described below

commit d164a4cfbebf6ba6a730b1a28d1acdadb6f551d1
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Jan 4 23:22:45 2024 +0800

    Remove SimpleMemoryPipelineChannelTest.assertFetchRecordsTimeoutCorrectly 
(#29651)
---
 .../channel/memory/MemoryPipelineChannelTest.java  | 22 +++-------------------
 1 file changed, 3 insertions(+), 19 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
index be6efd7ff45..78dcfa67241 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
@@ -21,42 +21,26 @@ import lombok.SneakyThrows;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
+import 
org.apache.shardingsphere.data.pipeline.core.task.InventoryTaskAckCallback;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class MemoryPipelineChannelTest {
     
     @SneakyThrows(InterruptedException.class)
     @Test
     void assertZeroQueueSizeWorks() {
-        MemoryPipelineChannel channel = new MemoryPipelineChannel(0, records 
-> {
-            
-        });
+        MemoryPipelineChannel channel = new MemoryPipelineChannel(0, new 
InventoryTaskAckCallback(new AtomicReference<>()));
         List<Record> records = Collections.singletonList(new 
PlaceholderRecord(new IngestFinishedPosition()));
         Thread thread = new Thread(() -> channel.push(records));
         thread.start();
         assertThat(channel.fetch(1, 500L), is(records));
         thread.join();
     }
-    
-    @Test
-    void assertFetchRecordsTimeoutCorrectly() {
-        MemoryPipelineChannel channel = new MemoryPipelineChannel(10, records 
-> {
-            
-        });
-        long startMillis = System.currentTimeMillis();
-        channel.fetch(1, 1L);
-        long delta = System.currentTimeMillis() - startMillis;
-        assertTrue(delta >= 1 && delta < 50, "Delta is not in [1,50) : " + 
delta);
-        startMillis = System.currentTimeMillis();
-        channel.fetch(1, 500L);
-        delta = System.currentTimeMillis() - startMillis;
-        assertTrue(delta >= 500 && delta < 750, "Delta is not in [500,750) : " 
+ delta);
-    }
 }

Reply via email to