gmethvin commented on code in PR #2684:
URL: https://github.com/apache/pekko/pull/2684#discussion_r2867146347
##########
stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala:
##########
@@ -661,3 +662,173 @@ import org.reactivestreams.Subscriber
(stageLogic, promise.future)
}
}
+
+/**
+ * INTERNAL API
+ *
+ * Dedicated stage for [[pekko.stream.scaladsl.Sink.eagerFutureSink]] that
materializes the inner sink
+ * when the future completes rather than waiting for the first element. Unlike
[[LazySink]], this
+ * correctly handles empty streams by materializing the inner sink and
completing it normally.
+ */
+@InternalApi final private[stream] class EagerFutureSink[T, M](future:
Future[Sink[T, M]])
+ extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] {
+ val in = Inlet[T]("eagerFutureSink.in")
+ override def initialAttributes = DefaultAttributes.eagerFutureSink
+ override val shape: SinkShape[T] = SinkShape.of(in)
+
+ override def toString: String = "EagerFutureSink"
+
+ override def createLogicAndMaterializedValue(inheritedAttributes:
Attributes): (GraphStageLogic, Future[M]) = {
+ val promise = Promise[M]()
+ val stageLogic = new GraphStageLogic(shape) with InHandler {
+ private var sinkReady: OptionVal[Sink[T, M]] = OptionVal.none
+ private var bufferedElement: OptionVal[T] = OptionVal.none
+ private var upstreamClosed = false
+ private var upstreamFailed: OptionVal[Throwable] = OptionVal.none
+
+ override def preStart(): Unit = {
+ pull(in)
+ val cb: AsyncCallback[Try[Sink[T, M]]] =
+ getAsyncCallback {
+ case Success(sink) => onSinkReady(sink)
+ case Failure(e) =>
+ promise.tryFailure(e)
+ failStage(e)
+ }
+ try {
+ future.onComplete(cb.invoke)(ExecutionContext.parasitic)
+ } catch {
+ case NonFatal(e) =>
+ promise.tryFailure(e)
+ failStage(e)
+ }
+ }
+
+ override def onPush(): Unit = {
+ sinkReady match {
+ case OptionVal.Some(sink) =>
+ switchTo(sink, OptionVal.Some(grab(in)))
+ case OptionVal.None =>
+ bufferedElement = OptionVal.Some(grab(in))
+ }
+ }
+
+ override def onUpstreamFinish(): Unit = {
+ upstreamClosed = true
+ sinkReady match {
+ case OptionVal.Some(sink) =>
+ switchTo(sink, OptionVal.none)
+ case OptionVal.None =>
+ setKeepGoing(true)
+ }
+ }
+
+ override def onUpstreamFailure(ex: Throwable): Unit = {
+ upstreamFailed = OptionVal.Some(ex)
+ upstreamClosed = true
+ sinkReady match {
+ case OptionVal.Some(_) =>
+ promise.tryFailure(ex)
+ failStage(ex)
+ case OptionVal.None =>
+ setKeepGoing(true)
Review Comment:
Refactored so this is no longer an issue. The implementation now follows the
`FutureFlow` pattern: before the future completes, the initial `InHandler`
simply records upstream state. When the future completes, `onSinkReady`
materializes the inner sink immediately and propagates any recorded upstream
failure to the sub-outlet. The `postStop` hook handles the edge case where the
future never completes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]