This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit a78464039155607e89297360776daeca909fab9f Author: LanKhuat <[email protected]> AuthorDate: Mon Feb 8 17:17:33 2021 +0700 JAMES-3491 WebSocket PUSH should support pushState --- .../jmap/rfc8621/contract/WebSocketContract.scala | 95 +++++++++++++++++----- .../org/apache/james/jmap/change/StateChange.scala | 8 +- .../james/jmap/core/WebSocketTransport.scala | 25 +++++- .../james/jmap/json/ResponseSerializer.scala | 14 +++- .../apache/james/jmap/routes/WebSocketRoutes.scala | 35 +++++++- .../jmap/change/StateChangeListenerTest.scala | 9 +- 6 files changed, 150 insertions(+), 36 deletions(-) diff --git a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala index b80740d..c92112b 100644 --- a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala +++ b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala @@ -23,7 +23,9 @@ import java.nio.charset.StandardCharsets import net.javacrumbs.jsonunit.assertj.JsonAssertions.assertThatJson import org.apache.james.GuiceJamesServer +import org.apache.james.jmap.api.change.State import org.apache.james.jmap.api.model.AccountId +import org.apache.james.jmap.core.PushState import org.apache.james.jmap.draft.JmapGuiceProbe import org.apache.james.jmap.rfc8621.contract.Fixture._ import org.apache.james.mailbox.MessageManager.AppendCommand @@ -504,11 +506,13 @@ trait WebSocketContract { Thread.sleep(100) val jmapGuiceProbe: JmapGuiceProbe = server.getProbe(classOf[JmapGuiceProbe]) - val emailState: String = jmapGuiceProbe.getLatestEmailState(accountId).getValue.toString - val mailboxState: String = jmapGuiceProbe.getLatestMailboxState(accountId).getValue.toString + val emailState: State = jmapGuiceProbe.getLatestEmailState(accountId) + val mailboxState: State = jmapGuiceProbe.getLatestMailboxState(accountId) - val mailboxStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Mailbox":"$mailboxState"}}}""" - val emailStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"$emailState"}}}""" + val globalState1: String = PushState.fromOption(Some(mailboxState), None).get.value + val globalState2: String = PushState.fromOption(None, Some(emailState)).get.value + val mailboxStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState1"}""" + val emailStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}"}},"pushState":"$globalState2"}""" assertThat(response.toOption.get.asJava) .hasSize(3) // email notification + mailbox notification + API response @@ -630,11 +634,13 @@ trait WebSocketContract { Thread.sleep(100) val jmapGuiceProbe: JmapGuiceProbe = server.getProbe(classOf[JmapGuiceProbe]) - val emailState: String = jmapGuiceProbe.getLatestEmailState(accountId).getValue.toString - val mailboxState: String = jmapGuiceProbe.getLatestMailboxState(accountId).getValue.toString + val mailboxState: State = jmapGuiceProbe.getLatestMailboxState(accountId) + val emailState: State = jmapGuiceProbe.getLatestEmailState(accountId) - val mailboxStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Mailbox":"$mailboxState"}}}""" - val emailStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"EmailDelivery":"$emailState","Email":"$emailState"}}}""" + val globalState1: String = PushState.fromOption(Some(mailboxState), None).get.value + val globalState2: String = PushState.fromOption(None, Some(emailState)).get.value + val mailboxStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState1"}""" + val emailStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"EmailDelivery":"${emailState.getValue}","Email":"${emailState.getValue}"}},"pushState":"$globalState2"}""" assertThat(response.toOption.get.asJava) .hasSize(3) // email notification + mailbox notification + API response @@ -646,7 +652,6 @@ trait WebSocketContract { // For client compatibility purposes def emailDeliveryShouldNotIncludeFlagUpdatesAndDeletes(server: GuiceJamesServer): Unit = { val bobPath = MailboxPath.inbox(BOB) - val accountId: AccountId = AccountId.fromUsername(BOB) val mailboxId = server.getProbe(classOf[MailboxProbeImpl]).createMailbox(bobPath) Thread.sleep(100) @@ -824,11 +829,13 @@ trait WebSocketContract { Thread.sleep(100) val jmapGuiceProbe: JmapGuiceProbe = server.getProbe(classOf[JmapGuiceProbe]) - val emailState: String = jmapGuiceProbe.getLatestEmailState(accountId).getValue.toString - val mailboxState: String = jmapGuiceProbe.getLatestMailboxState(accountId).getValue.toString + val emailState: State = jmapGuiceProbe.getLatestEmailState(accountId) + val mailboxState: State = jmapGuiceProbe.getLatestMailboxState(accountId) - val mailboxStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Mailbox":"$mailboxState"}}}""" - val emailStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"EmailDelivery":"$emailState","Email":"$emailState"}}}""" + val globalState1: String = PushState.fromOption(Some(mailboxState), None).get.value + val globalState2: String = PushState.fromOption(None, Some(emailState)).get.value + val mailboxStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState1"}""" + val emailStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"EmailDelivery":"${emailState.getValue}","Email":"${emailState.getValue}"}},"pushState":"$globalState2"}""" assertThat(response.toOption.get.asJava) .hasSize(3) // email notification + mailbox notification + API response @@ -895,11 +902,13 @@ trait WebSocketContract { Thread.sleep(100) val jmapGuiceProbe: JmapGuiceProbe = server.getProbe(classOf[JmapGuiceProbe]) - val emailState: String = jmapGuiceProbe.getLatestEmailState(accountId).getValue.toString - val mailboxState: String = jmapGuiceProbe.getLatestMailboxState(accountId).getValue.toString + val emailState: State = jmapGuiceProbe.getLatestEmailState(accountId) + val mailboxState: State = jmapGuiceProbe.getLatestMailboxState(accountId) - val mailboxStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Mailbox":"$mailboxState"}}}""" - val emailStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"$emailState"}}}""" + val globalState1: String = PushState.fromOption(Some(mailboxState), None).get.value + val globalState2: String = PushState.fromOption(None, Some(emailState)).get.value + val mailboxStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState1"}""" + val emailStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}"}},"pushState":"$globalState2"}""" assertThat(response.toOption.get.asJava) .hasSize(2) // No Email notification @@ -957,11 +966,13 @@ trait WebSocketContract { val jmapGuiceProbe: JmapGuiceProbe = server.getProbe(classOf[JmapGuiceProbe]) val accountId: AccountId = AccountId.fromUsername(BOB) - val emailState: String = jmapGuiceProbe.getLatestEmailStateWithDelegation(accountId).getValue.toString - val mailboxState: String = jmapGuiceProbe.getLatestMailboxStateWithDelegation(accountId).getValue.toString + val emailState: State = jmapGuiceProbe.getLatestEmailStateWithDelegation(accountId) + val mailboxState: State = jmapGuiceProbe.getLatestMailboxStateWithDelegation(accountId) - val mailboxStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Mailbox":"$mailboxState"}}}""" - val emailStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"$emailState"}}}""" + val globalState1: String = PushState.fromOption(Some(mailboxState), None).get.value + val globalState2: String = PushState.fromOption(None, Some(emailState)).get.value + val mailboxStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState1"}""" + val emailStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}"}},"pushState":"$globalState2"}""" assertThat(response.toOption.get.asJava) .hasSize(2) // email notification + mailbox notification @@ -1068,6 +1079,48 @@ trait WebSocketContract { .body } + @Test + @Timeout(180) + def pushEnableRequestWithPushStateShouldReturnServerState(server: GuiceJamesServer): Unit = { + val bobPath = MailboxPath.inbox(BOB) + val accountId: AccountId = AccountId.fromUsername(BOB) + val mailboxId = server.getProbe(classOf[MailboxProbeImpl]).createMailbox(bobPath) + + Thread.sleep(100) + + val response: Either[String, String] = + authenticatedRequest(server) + .response(asWebSocket[Identity, String] { + ws => + ws.send(WebSocketFrame.text( + """{ + | "@type": "WebSocketPushEnable", + | "dataTypes": ["Mailbox", "Email"], + | "pushState": "aaa" + |}""".stripMargin)) + + Thread.sleep(100) + + ws.receive() + .map { case t: Text => + t.payload + } + }) + .send(backend) + .body + + Thread.sleep(100) + + val jmapGuiceProbe: JmapGuiceProbe = server.getProbe(classOf[JmapGuiceProbe]) + val emailState: State = jmapGuiceProbe.getLatestEmailState(accountId) + val mailboxState: State = jmapGuiceProbe.getLatestMailboxState(accountId) + val globalState: PushState = PushState.from(mailboxState, emailState) + val pushEnableResponse: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Mailbox":"${mailboxState.getValue}","Email":"${emailState.getValue}"}},"pushState":"${globalState.value}"}""" + + assertThat(response.toOption.get) + .isEqualTo(pushEnableResponse) + } + private def authenticatedRequest(server: GuiceJamesServer): RequestT[Identity, Either[String, String], Any] = { val port = server.getProbe(classOf[JmapGuiceProbe]) .getJmapPort diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChange.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChange.scala index 25ea386..0d556a9 100644 --- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChange.scala +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChange.scala @@ -22,7 +22,8 @@ package org.apache.james.jmap.change import org.apache.james.core.Username import org.apache.james.events.Event import org.apache.james.events.Event.EventId -import org.apache.james.jmap.core.{AccountId, State, StateChange} +import org.apache.james.jmap.api.change.{State => JavaState} +import org.apache.james.jmap.core.{AccountId, PushState, State, StateChange} object TypeName { val ALL: Set[TypeName] = Set(EmailTypeName, MailboxTypeName, ThreadTypeName, IdentityTypeName, EmailSubmissionTypeName, EmailDeliveryTypeName) @@ -91,7 +92,10 @@ case class StateChangeEvent(eventId: EventId, VacationResponseTypeName.asMap(vacationResponseState) ++ MailboxTypeName.asMap(mailboxState) ++ EmailDeliveryTypeName.asMap(emailDeliveryState) ++ - EmailTypeName.asMap(emailState)))) + EmailTypeName.asMap(emailState))), + PushState.fromOption( + mailboxState.map(state => JavaState.of(state.value)), + emailState.map(state => JavaState.of(state.value)))) override val getUsername: Username = username diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/WebSocketTransport.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/WebSocketTransport.scala index 5597901..196497d 100644 --- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/WebSocketTransport.scala +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/WebSocketTransport.scala @@ -19,6 +19,10 @@ package org.apache.james.jmap.core +import java.nio.charset.StandardCharsets + +import com.google.common.hash.Hashing +import org.apache.james.jmap.api.change.State import org.apache.james.jmap.change.{TypeName, TypeState} import org.apache.james.jmap.routes.PingPolicy.Interval @@ -36,15 +40,30 @@ case class WebSocketResponse(requestId: Option[RequestId], responseObject: Respo case class WebSocketError(requestId: Option[RequestId], problemDetails: ProblemDetails) extends OutboundMessage -case class StateChange(changes: Map[AccountId, TypeState]) extends OutboundMessage { +object PushState { + def from(mailboxState: State, emailState: State): PushState = + PushState(hashStates(List(mailboxState, emailState))) + + def fromOption(mailboxState: Option[State], emailState: Option[State]): Option[PushState] = + List(mailboxState, emailState).flatten match { + case Nil => None + case states => Some(PushState(hashStates(states))) + } + + private def hashStates(states: List[State]): String = Hashing.sha256().hashString(states.mkString("_"), StandardCharsets.UTF_8).toString +} + +case class PushState(value: String) + +case class StateChange(changes: Map[AccountId, TypeState], pushState: Option[PushState]) extends OutboundMessage { def filter(types: Set[TypeName]): Option[StateChange] = Option(changes.flatMap { case (accountId, typeState) => typeState.filter(types).map(typeState => (accountId, typeState)) }) .filter(_.nonEmpty) - .map(StateChange) + .map(changes => StateChange(changes, pushState)) } -case class WebSocketPushEnable(dataTypes: Option[Set[TypeName]]) extends WebSocketInboundMessage +case class WebSocketPushEnable(dataTypes: Option[Set[TypeName]], pushState: Option[PushState]) extends WebSocketInboundMessage case object WebSocketPushDisable extends WebSocketInboundMessage diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala index 847acab..89d4305 100644 --- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala @@ -191,6 +191,7 @@ object ResponseSerializer { .fold(errorMessage => JsError(errorMessage), JsSuccess(_)) case _ => JsError("Expecting a JsString as typeName") } + private implicit val pushStateReads: Reads[PushState] = Json.valueReads[PushState] private implicit val webSocketPushEnableReads: Reads[WebSocketPushEnable] = Json.reads[WebSocketPushEnable] private implicit val webSocketInboundReads: Reads[WebSocketInboundMessage] = { case json: JsObject => @@ -208,10 +209,17 @@ object ResponseSerializer { private implicit val typeStateMapWrites: Writes[Map[TypeName, State]] = mapWrites[TypeName, State](_.asString(), stateWrites) private implicit val typeStateWrites: Writes[TypeState] = Json.valueWrites[TypeState] private implicit val changeWrites: OWrites[Map[AccountId, TypeState]] = mapWrites[AccountId, TypeState](_.id.value, typeStateWrites) + private implicit val pushStateWrites: Writes[PushState] = Json.valueWrites[PushState] private implicit val stateChangeWrites: Writes[StateChange] = stateChange => - JsObject(Map( - "@type" -> JsString("StateChange"), - "changed" -> changeWrites.writes(stateChange.changes))) + stateChange.pushState.map(pushState => + JsObject(Map( + "@type" -> JsString("StateChange"), + "changed" -> changeWrites.writes(stateChange.changes), + "pushState" -> pushStateWrites.writes(pushState)))) + .getOrElse( + JsObject(Map( + "@type" -> JsString("StateChange"), + "changed" -> changeWrites.writes(stateChange.changes)))) private implicit val webSocketResponseWrites: Writes[WebSocketResponse] = response => { val apiResponseJson: JsObject = responseObjectFormat.writes(response.responseObject) diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala index 5f556a1..360e1bc 100644 --- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala @@ -27,11 +27,14 @@ import io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE import io.netty.handler.codec.http.HttpMethod import io.netty.handler.codec.http.websocketx.WebSocketFrame import javax.inject.{Inject, Named} +import org.apache.james.core.Username import org.apache.james.events.{EventBus, Registration} import org.apache.james.jmap.HttpConstants.JSON_CONTENT_TYPE import org.apache.james.jmap.JMAPUrls.JMAP_WS -import org.apache.james.jmap.change.{AccountIdRegistrationKey, StateChangeListener, TypeName} -import org.apache.james.jmap.core.{OutboundMessage, ProblemDetails, RequestId, WebSocketError, WebSocketPushDisable, WebSocketPushEnable, WebSocketRequest, WebSocketResponse} +import org.apache.james.jmap.api.change.{EmailChangeRepository, MailboxChangeRepository} +import org.apache.james.jmap.api.model.{AccountId => JavaAccountId} +import org.apache.james.jmap.change.{AccountIdRegistrationKey, StateChangeListener, TypeName, _} +import org.apache.james.jmap.core.{OutboundMessage, ProblemDetails, RequestId, WebSocketError, WebSocketPushDisable, WebSocketPushEnable, WebSocketRequest, WebSocketResponse, _} import org.apache.james.jmap.http.rfc8621.InjectionKeys import org.apache.james.jmap.http.{Authenticator, UserProvisioning} import org.apache.james.jmap.json.ResponseSerializer @@ -64,7 +67,9 @@ case class ClientContext(outbound: Sinks.Many[OutboundMessage], pushRegistration class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticator: Authenticator, userProvisioner: UserProvisioning, @Named(JMAPInjectionKeys.JMAP) eventBus: EventBus, - jmapApi: JMAPApi) extends JMAPRoutes { + jmapApi: JMAPApi, + mailboxChangeRepository: MailboxChangeRepository, + emailChangeRepository: EmailChangeRepository) extends JMAPRoutes { override def routes(): stream.Stream[JMAPRoute] = stream.Stream.of( JMAPRoute.builder @@ -131,10 +136,32 @@ class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticato StateChangeListener(pushEnable.dataTypes.getOrElse(TypeName.ALL), clientContext.outbound), AccountIdRegistrationKey.of(clientContext.session.getUser))) .doOnNext(newRegistration => clientContext.withRegistration(newRegistration)) - .`then`() + .`then`(sendPushStateIfRequested(pushEnable, clientContext)) case WebSocketPushDisable => SMono.fromCallable(() => clientContext.clean()) }) + private def sendPushStateIfRequested(pushEnable: WebSocketPushEnable, clientContext: ClientContext): SMono[Unit] = + pushEnable.pushState + .map(_ => sendPushState(clientContext)) + .getOrElse(SMono.empty) + + private def sendPushState(clientContext: ClientContext): SMono[Unit] = { + val username: Username = clientContext.session.getUser + val accountId: AccountId = AccountId.from(username).fold( + failure => throw new IllegalArgumentException(failure), + success => success) + SMono( + for { + mailboxState <- mailboxChangeRepository.getLatestStateWithDelegation(JavaAccountId.fromUsername(username)) + emailState <- emailChangeRepository.getLatestStateWithDelegation(JavaAccountId.fromUsername(username)) + } yield { + clientContext.outbound.emitNext(StateChange(Map(accountId -> TypeState( + MailboxTypeName.asMap(Some(State.fromJava(mailboxState))) ++ + EmailTypeName.asMap(Some(State.fromJava(emailState))))), + Some(PushState.from(mailboxState, emailState))), FAIL_FAST) + }) + } + private def handleHttpHandshakeError(throwable: Throwable, response: HttpServerResponse): SMono[Void] = respondDetails(response, ProblemDetails.forThrowable(throwable)) diff --git a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/StateChangeListenerTest.scala b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/StateChangeListenerTest.scala index d2d717a..e9b318c 100644 --- a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/StateChangeListenerTest.scala +++ b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/StateChangeListenerTest.scala @@ -21,7 +21,8 @@ package org.apache.james.jmap.change import org.apache.james.core.Username import org.apache.james.events.Event.EventId -import org.apache.james.jmap.core.{AccountId, OutboundMessage, State, StateChange} +import org.apache.james.jmap.api.change.{State => JavaState} +import org.apache.james.jmap.core.{AccountId, OutboundMessage, PushState, State, StateChange} import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Test import reactor.core.publisher.Sinks @@ -48,10 +49,11 @@ class StateChangeListenerTest { SMono(listener.reactiveEvent(event)).subscribeOn(Schedulers.elastic()).block() sink.emitComplete(EmitFailureHandler.FAIL_FAST) + val globalState = PushState.from(JavaState.of(mailboxState.value), JavaState.of(emailState.value)) assertThat(sink.asFlux().collectList().block()) .containsExactly(StateChange(Map(AccountId.from(Username.of("bob")).toOption.get -> TypeState(Map( MailboxTypeName -> mailboxState, - EmailTypeName -> emailState))))) + EmailTypeName -> emailState))), Some(globalState))) } @Test @@ -68,9 +70,10 @@ class StateChangeListenerTest { SMono(listener.reactiveEvent(event)).subscribeOn(Schedulers.elastic()).block() sink.emitComplete(EmitFailureHandler.FAIL_FAST) + val globalState = PushState.from(JavaState.of(mailboxState.value), JavaState.of(emailState.value)) assertThat(sink.asFlux().collectList().block()) .containsExactly(StateChange(Map(AccountId.from(Username.of("bob")).toOption.get -> TypeState(Map( - MailboxTypeName -> mailboxState))))) + MailboxTypeName -> mailboxState))), Some(globalState))) } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
