He-Pin commented on code in PR #2365:
URL: https://github.com/apache/pekko/pull/2365#discussion_r2462850577
##########
stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala:
##########
@@ -2250,69 +2260,86 @@ private[pekko] final class StatefulMap[S, In,
Out](create: () => S, f: (S, In) =
new GraphStageLogic(shape) with InHandler with OutHandler {
lazy val decider: Decider =
inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
- private var state: S = _
- private var needInvokeOnCompleteCallback: Boolean = false
+ private var state: OptionVal[S] = OptionVal.none
- override def preStart(): Unit = {
- state = create()
- needInvokeOnCompleteCallback = true
- }
+ override def preStart(): Unit =
+ createNewState()
- override def onPush(): Unit =
+ override def onPush(): Unit = {
try {
val elem = grab(in)
- val (newState, newElem) = f(state, elem)
- state = newState
+ val (newState, newElem) = f(state.get, elem)
+ state = OptionVal.Some(newState)
+ throwIfNoState()
push(out, newElem)
} catch {
- case NonFatal(ex) =>
+ case ex: NullStateException => throw ex // don't cover with
supervision
+ case NonFatal(ex) =>
decider(ex) match {
case Supervision.Stop => closeStateAndFail(ex)
case Supervision.Resume => pull(in)
- case Supervision.Restart => resetStateAndPull()
+ case Supervision.Restart => restartState()
}
}
+ }
- override def onUpstreamFinish(): Unit = closeStateAndComplete()
+ override def onUpstreamFinish(): Unit = {
+ completeStateIfNeeded() match {
+ case Some(elem) => emit(out, elem, () => completeStage())
+ case None => completeStage()
+ }
+ }
override def onUpstreamFailure(ex: Throwable): Unit =
closeStateAndFail(ex)
override def onDownstreamFinish(cause: Throwable): Unit = {
- needInvokeOnCompleteCallback = false
- onComplete(state)
+ completeStateIfNeeded()
super.onDownstreamFinish(cause)
}
- private def resetStateAndPull(): Unit = {
- needInvokeOnCompleteCallback = false
- onComplete(state)
- state = create()
- needInvokeOnCompleteCallback = true;
- pull(in)
+ private def createNewState(): Unit = {
+ state = OptionVal.Some(create())
+ throwIfNoState()
}
- private def closeStateAndComplete(): Unit = {
- needInvokeOnCompleteCallback = false
- onComplete(state) match {
- case Some(elem) => emit(out, elem, () => completeStage())
- case None => completeStage()
+ private def restartState(): Unit = {
+ completeStateIfNeeded() match {
+ case Some(elem) =>
+ push(out, elem)
+ createNewState()
+ case None =>
+ createNewState()
+ // should always happen here but for good measure
+ if (!hasBeenPulled(in)) pull(in)
}
}
- private def closeStateAndFail(ex: Throwable): Unit = {
- needInvokeOnCompleteCallback = false
- onComplete(state) match {
+ private def closeStateAndFail(ex: Throwable): Unit =
+ completeStateIfNeeded() match {
case Some(elem) => emit(out, elem, () => failStage(ex))
case None => failStage(ex)
}
+
+ private def completeStateIfNeeded(): Option[Out] = {
+ state match {
+ case OptionVal.Some(s) =>
+ state = OptionVal.none[S]
+ onComplete(s)
+ case _ => None
+ }
}
override def onPull(): Unit = pull(in)
override def postStop(): Unit = {
- if (needInvokeOnCompleteCallback) {
- onComplete(state)
- }
+ completeStateIfNeeded()
+ }
+
+ private def throwIfNoState(): Unit = {
Review Comment:
This seems to change the behavior. @pjfanning I have not tested it ,but , if
that's true, we should revert some changes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]