He-Pin commented on code in PR #1611:
URL: https://github.com/apache/pekko/pull/1611#discussion_r1889022056


##########
stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala:
##########
@@ -57,36 +57,44 @@ object SourceWithContext {
     SourceWithContext.fromTuples(Source.fromGraph(GraphDSL.createGraph(source, 
viaFlow)(combine) {
       implicit b => (s, viaF) =>
         import GraphDSL.Implicits._
-        val broadcast = b.add(Broadcast[(Option[SOut], Ctx)](2))
-        val merge = b.add(Merge[(Option[FOut], Ctx)](2))
 
-        val unzip = b.add(Unzip[SOut, Ctx]())
-        val zipper = b.add(Zip[FOut, Ctx]())
-
-        val filterAvailable = Flow[(Option[SOut], Ctx)].collect {
-          case (Some(f), ctx) => (f, ctx)
-        }
-
-        val filterUnavailable = Flow[(Option[SOut], Ctx)].collect {
-          case (None, ctx) => (Option.empty[FOut], ctx)
+        case class IndexedCtx(idx: Long, ctx: Ctx)
+        val partition = b.add(Partition[(Option[SOut], IndexedCtx)](2,
+          {
+            case (None, _)    => 0
+            case (Some(_), _) => 1
+          }))
+
+        val sequence = Flow[(Option[SOut], Ctx)].zipWithIndex
+          .map {
+            case ((opt, ctx), idx) => (opt, IndexedCtx(idx, ctx))
+          }
+
+        val unzip = b.add(Unzip[Option[SOut], IndexedCtx]())
+        val zipper = b.add(Zip[FOut, IndexedCtx]())
+        val mergeSequence = b.add(MergeSequence[(Option[FOut], 
IndexedCtx)](2)(_._2.idx))
+        val unwrapSome = b.add(Flow[Option[SOut]].map {
+          case Some(elem) => elem
+          case _          => throw new IllegalStateException("Only expects 
Some")
+        })
+        val unwrap = Flow[(Option[FOut], IndexedCtx)].map {
+          case (opt, indexedCtx) => (opt, indexedCtx.ctx)
         }
 
-        val mapIntoOption = Flow[(FOut, Ctx)].map {
-          case (f, ctx) => (Some(f), ctx)
+        val mapIntoOption = Flow[(FOut, IndexedCtx)].map {
+          case (elem, indexedCtx) => (Some(elem), indexedCtx)
         }
 
-        s ~> broadcast.in
-
-        broadcast.out(0) ~> filterAvailable ~> unzip.in
-
-        unzip.out0 ~> viaF ~> zipper.in0
-        unzip.out1 ~> zipper.in1
-
-        zipper.out ~> mapIntoOption ~> merge.in(0)
-
-        broadcast.out(1) ~> filterUnavailable ~> merge.in(1)
+        //format: off
+        s ~> sequence ~> partition.in
+        partition.out(0).asInstanceOf[Outlet[(Option[FOut], IndexedCtx)]] ~> 
mergeSequence.in(0)

Review Comment:
   cast here to avoid allocation, it will always be `None` because of partition



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org
For additional commands, e-mail: notifications-h...@pekko.apache.org

Reply via email to