This is an automated email from the ASF dual-hosted git repository.
rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 03df179 JAMES-2813 make TerminationSubscriberContract less
time-sensitive
new 6eb6796 Merge remote-tracking branch
'mbaechler/fix-termination-subscriber-contract'
03df179 is described below
commit 03df179112e4deef2ec716517383a5f04f7fadb8
Author: Matthieu Baechler <[email protected]>
AuthorDate: Mon Oct 7 17:18:57 2019 +0200
JAMES-2813 make TerminationSubscriberContract less time-sensitive
---
.../RabbitMQTerminationSubscriberTest.java | 28 +++++++++++--------
.../task/eventsourcing/TerminationSubscriber.scala | 2 +-
.../TerminationSubscriberContract.java | 32 ++++++++++++++--------
3 files changed, 38 insertions(+), 24 deletions(-)
diff --git
a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
index a6a9ab9..ce0991d 100644
---
a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
+++
b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
@@ -21,7 +21,9 @@
package org.apache.james.task.eventsourcing.distributed;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Duration.ONE_MINUTE;
+import java.util.ArrayList;
import java.util.List;
import java.util.Set;
@@ -32,12 +34,12 @@ import
org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule;
import org.apache.james.server.task.json.JsonTaskSerializer;
import org.apache.james.task.eventsourcing.TerminationSubscriber;
import org.apache.james.task.eventsourcing.TerminationSubscriberContract;
-
-import com.github.steveash.guavate.Guavate;
+import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+
+import com.github.steveash.guavate.Guavate;
import reactor.core.publisher.Flux;
-import reactor.core.scheduler.Schedulers;
class RabbitMQTerminationSubscriberTest implements
TerminationSubscriberContract {
private static final JsonTaskSerializer TASK_SERIALIZER = new
JsonTaskSerializer();
@@ -59,15 +61,19 @@ class RabbitMQTerminationSubscriberTest implements
TerminationSubscriberContract
TerminationSubscriber subscriber1 = subscriber();
TerminationSubscriber subscriber2 = subscriber();
+ Flux<Event> firstListener = Flux.from(subscriber1.listenEvents());
+ Flux<Event> secondListener = Flux.from(subscriber2.listenEvents());
+
sendEvents(subscriber1, COMPLETED_EVENT);
- List<List<Event>> listenedEvents = Flux.just(subscriber1, subscriber2)
- .subscribeOn(Schedulers.boundedElastic())
- .flatMap(this::collectEvents)
- .collectList()
- .block();
- assertThat(listenedEvents).hasSize(2);
- assertThat(listenedEvents.get(0)).containsExactly(COMPLETED_EVENT);
- assertThat(listenedEvents.get(1)).containsExactly(COMPLETED_EVENT);
+ List<Event> receivedEventsFirst = new ArrayList<>();
+ firstListener.subscribe(receivedEventsFirst::add);
+ List<Event> receivedEventsSecond = new ArrayList<>();
+ secondListener.subscribe(receivedEventsSecond::add);
+
+ Awaitility.await().atMost(ONE_MINUTE).until(() ->
receivedEventsFirst.size() == 1 && receivedEventsSecond.size() == 1);
+
+ assertThat(receivedEventsFirst).containsExactly(COMPLETED_EVENT);
+ assertThat(receivedEventsSecond).containsExactly(COMPLETED_EVENT);
}
}
\ No newline at end of file
diff --git
a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TerminationSubscriber.scala
b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TerminationSubscriber.scala
index af23af4..b0c705b 100644
---
a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TerminationSubscriber.scala
+++
b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TerminationSubscriber.scala
@@ -36,7 +36,7 @@ trait TerminationSubscriber extends Subscriber {
}
class MemoryTerminationSubscriber extends TerminationSubscriber {
- private val events = DirectProcessor.create[Event]()
+ private val events: DirectProcessor[Event] = DirectProcessor.create[Event]()
override def addEvent(event: Event): Unit =
events.onNext(event)
diff --git
a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java
b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java
index 705b291..af7fda8 100644
---
a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java
+++
b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java
@@ -21,8 +21,10 @@
package org.apache.james.task.eventsourcing;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Duration.ONE_MINUTE;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.List;
import org.apache.james.eventsourcing.Event;
@@ -31,7 +33,9 @@ import org.apache.james.task.Hostname;
import org.apache.james.task.Task;
import org.apache.james.task.TaskId;
import org.assertj.core.api.ListAssert;
+import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
+import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -97,16 +101,20 @@ public interface TerminationSubscriberContract {
default void multipleListeningEventsShouldShareEvents() {
TerminationSubscriber subscriber = subscriber();
+ Flux<Event> firstListener = Flux.from(subscriber.listenEvents());
+ Flux<Event> secondListener = Flux.from(subscriber.listenEvents());
+
sendEvents(subscriber, COMPLETED_EVENT, FAILED_EVENT, CANCELLED_EVENT);
- List<List<Event>> listenedEvents = Flux.range(0, 2)
- .subscribeOn(Schedulers.boundedElastic())
- .flatMap(ignored -> collectEvents(subscriber))
- .collectList()
- .block();
- assertThat(listenedEvents).hasSize(2);
- assertThat(listenedEvents.get(0)).containsExactly(COMPLETED_EVENT,
FAILED_EVENT, CANCELLED_EVENT);
- assertThat(listenedEvents.get(1)).isEqualTo(listenedEvents.get(0));
+ List<Event> receivedEventsFirst = new ArrayList<>();
+ firstListener.subscribe(receivedEventsFirst::add);
+ List<Event> receivedEventsSecond = new ArrayList<>();
+ secondListener.subscribe(receivedEventsSecond::add);
+
+ Awaitility.await().atMost(ONE_MINUTE).until(() ->
receivedEventsFirst.size() == 3 && receivedEventsSecond.size() == 3);
+
+ assertThat(receivedEventsFirst).containsExactly(COMPLETED_EVENT,
FAILED_EVENT, CANCELLED_EVENT);
+ assertThat(receivedEventsSecond).containsExactly(COMPLETED_EVENT,
FAILED_EVENT, CANCELLED_EVENT);
}
@Test
@@ -116,19 +124,19 @@ public interface TerminationSubscriberContract {
sendEvents(subscriber, COMPLETED_EVENT, FAILED_EVENT, CANCELLED_EVENT);
List<Event> listenedEvents =
Mono.delay(DELAY_BEFORE_PUBLISHING.plus(DELAY_BETWEEN_EVENTS.multipliedBy(3).dividedBy(2)))
- .then(Mono.defer(() -> collectEvents(subscriber)))
+ .then(Mono.defer(() -> collectEvents(subscriber.listenEvents())))
.subscribeOn(Schedulers.boundedElastic())
.block();
assertThat(listenedEvents).containsExactly(FAILED_EVENT,
CANCELLED_EVENT);
}
default ListAssert<Event> assertEvents(TerminationSubscriber subscriber) {
- return assertThat(collectEvents(subscriber)
+ return assertThat(collectEvents(subscriber.listenEvents())
.block());
}
- default Mono<List<Event>> collectEvents(TerminationSubscriber subscriber) {
- return Flux.from(subscriber.listenEvents())
+ default Mono<List<Event>> collectEvents(Publisher<Event> listener) {
+ return Flux.from(listener)
.subscribeOn(Schedulers.boundedElastic())
.take(DELAY_BEFORE_PUBLISHING.plus(DELAY_BETWEEN_EVENTS.multipliedBy(7)))
.collectList();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]