This is an automated email from the ASF dual-hosted git repository.
jianglongtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new d164a4cfbeb Remove
SimpleMemoryPipelineChannelTest.assertFetchRecordsTimeoutCorrectly (#29651)
d164a4cfbeb is described below
commit d164a4cfbebf6ba6a730b1a28d1acdadb6f551d1
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Jan 4 23:22:45 2024 +0800
Remove SimpleMemoryPipelineChannelTest.assertFetchRecordsTimeoutCorrectly
(#29651)
---
.../channel/memory/MemoryPipelineChannelTest.java | 22 +++-------------------
1 file changed, 3 insertions(+), 19 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
index be6efd7ff45..78dcfa67241 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java
@@ -21,42 +21,26 @@ import lombok.SneakyThrows;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
import
org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
+import
org.apache.shardingsphere.data.pipeline.core.task.InventoryTaskAckCallback;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertTrue;
class MemoryPipelineChannelTest {
@SneakyThrows(InterruptedException.class)
@Test
void assertZeroQueueSizeWorks() {
- MemoryPipelineChannel channel = new MemoryPipelineChannel(0, records
-> {
-
- });
+ MemoryPipelineChannel channel = new MemoryPipelineChannel(0, new
InventoryTaskAckCallback(new AtomicReference<>()));
List<Record> records = Collections.singletonList(new
PlaceholderRecord(new IngestFinishedPosition()));
Thread thread = new Thread(() -> channel.push(records));
thread.start();
assertThat(channel.fetch(1, 500L), is(records));
thread.join();
}
-
- @Test
- void assertFetchRecordsTimeoutCorrectly() {
- MemoryPipelineChannel channel = new MemoryPipelineChannel(10, records
-> {
-
- });
- long startMillis = System.currentTimeMillis();
- channel.fetch(1, 1L);
- long delta = System.currentTimeMillis() - startMillis;
- assertTrue(delta >= 1 && delta < 50, "Delta is not in [1,50) : " +
delta);
- startMillis = System.currentTimeMillis();
- channel.fetch(1, 500L);
- delta = System.currentTimeMillis() - startMillis;
- assertTrue(delta >= 500 && delta < 750, "Delta is not in [500,750) : "
+ delta);
- }
}