He-Pin commented on code in PR #561:
URL: https://github.com/apache/incubator-pekko/pull/561#discussion_r1320933234


##########
stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala:
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream
+
+import scala.collection.mutable
+import scala.concurrent.Future
+import scala.util.control.{ NoStackTrace, NonFatal }
+import scala.util.{ Failure, Success, Try }
+
+import org.apache.pekko
+import pekko.dispatch.ExecutionContexts
+import pekko.stream.ActorAttributes.SupervisionStrategy
+import pekko.stream.Attributes.{ Name, SourceLocation }
+import pekko.stream.MapAsyncPartitioned._
+import pekko.stream.scaladsl.{ Flow, FlowWithContext, Source, 
SourceWithContext }
+import pekko.stream.stage._
+
+private[stream] object MapAsyncPartitioned {
+
+  private def extractPartitionWithCtx[In, Ctx, Partition](extract: In => 
Partition)(tuple: (In, Ctx)): Partition =
+    extract(tuple._1)
+
+  private def fWithCtx[In, Out, Ctx, Partition](f: (In, Partition) => 
Future[Out])(tuple: (In, Ctx),
+      partition: Partition): Future[(Out, Ctx)] =
+    f(tuple._1, partition).map(_ -> tuple._2)(ExecutionContexts.parasitic)
+
+  def mapSourceOrdered[In, Out, Partition, Mat](source: Source[In, Mat], 
parallelism: Int)(
+      extractPartition: In => Partition)(
+      f: (In, Partition) => Future[Out]): Source[Out, Mat] =
+    source.via(new MapAsyncPartitioned[In, Out, Partition](orderedOutput = 
true, parallelism, extractPartition, f))
+
+  def mapSourceUnordered[In, Out, Partition, Mat](source: Source[In, Mat], 
parallelism: Int)(
+      extractPartition: In => Partition)(
+      f: (In, Partition) => Future[Out]): Source[Out, Mat] =
+    source.via(new MapAsyncPartitioned[In, Out, Partition](orderedOutput = 
false, parallelism, extractPartition, f))
+
+  def mapSourceWithContextOrdered[In, Ctx, T, Partition, Mat](flow: 
SourceWithContext[In, Ctx, Mat], parallelism: Int)(
+      extractPartition: In => Partition)(
+      f: (In, Partition) => Future[T]): SourceWithContext[T, Ctx, Mat] =
+    flow.via(
+      new MapAsyncPartitioned[(In, Ctx), (T, Ctx), Partition](
+        orderedOutput = true,
+        parallelism,
+        extractPartitionWithCtx(extractPartition),
+        fWithCtx[In, T, Ctx, Partition](f)))
+
+  def mapSourceWithContextUnordered[In, Ctx, T, Partition, Mat](flow: 
SourceWithContext[In, Ctx, Mat],
+      parallelism: Int)(extractPartition: In => Partition)(
+      f: (In, Partition) => Future[T]): SourceWithContext[T, Ctx, Mat] =
+    flow.via(
+      new MapAsyncPartitioned[(In, Ctx), (T, Ctx), Partition](
+        orderedOutput = false,
+        parallelism,
+        extractPartitionWithCtx(extractPartition),
+        fWithCtx[In, T, Ctx, Partition](f)))
+
+  def mapFlowOrdered[In, Out, T, Partition, Mat](flow: Flow[In, Out, Mat], 
parallelism: Int)(
+      extractPartition: Out => Partition)(
+      f: (Out, Partition) => Future[T]): Flow[In, T, Mat] =
+    flow.via(new MapAsyncPartitioned[Out, T, Partition](orderedOutput = true, 
parallelism, extractPartition,
+      f))
+
+  def mapFlowUnordered[In, Out, T, Partition, Mat](flow: Flow[In, Out, Mat], 
parallelism: Int)(
+      extractPartition: Out => Partition)(
+      f: (Out, Partition) => Future[T]): Flow[In, T, Mat] =
+    flow.via(new MapAsyncPartitioned[Out, T, Partition](orderedOutput = false, 
parallelism,
+      extractPartition, f))
+
+  def mapFlowWithContextOrdered[In, Out, CtxIn, CtxOut, T, Partition, Mat](
+      flow: FlowWithContext[In, CtxIn, Out, CtxOut, Mat], parallelism: Int)(
+      extractPartition: Out => Partition)(
+      f: (Out, Partition) => Future[T]): FlowWithContext[In, CtxIn, T, CtxOut, 
Mat] =
+    flow.via(
+      new MapAsyncPartitioned[(Out, CtxOut), (T, CtxOut), Partition](
+        orderedOutput = true,
+        parallelism,
+        extractPartitionWithCtx(extractPartition),
+        fWithCtx[Out, T, CtxOut, Partition](f)))
+
+  def mapFlowWithContextUnordered[In, Out, CtxIn, CtxOut, T, Partition, Mat](
+      flow: FlowWithContext[In, CtxIn, Out, CtxOut, Mat], parallelism: 
Int)(extractPartition: Out => Partition)(
+      f: (Out, Partition) => Future[T]): FlowWithContext[In, CtxIn, T, CtxOut, 
Mat] =
+    flow.via(
+      new MapAsyncPartitioned[(Out, CtxOut), (T, CtxOut), Partition](
+        orderedOutput = false,
+        parallelism,
+        extractPartitionWithCtx(extractPartition),
+        fWithCtx[Out, T, CtxOut, Partition](f)))
+
+  private[stream] val NotYetThere: Failure[Nothing] = Failure(new Exception 
with NoStackTrace)
+
+  private[stream] final class Holder[In, Out](
+      val in: In,
+      var out: Try[Out],
+      callback: AsyncCallback[Holder[In, Out]]) extends (Try[Out] => Unit) {
+
+    // To support both fail-fast when the supervision directive is Stop
+    // and not calling the decider multiple times (#23888) we need to cache 
the decider result and re-use that
+    private var cachedSupervisionDirective: Option[Supervision.Directive] = 
None
+
+    def supervisionDirectiveFor(decider: Supervision.Decider, ex: Throwable): 
Supervision.Directive = {
+      cachedSupervisionDirective match {
+        case Some(d) => d
+        case _ =>
+          val d = decider(ex)
+          cachedSupervisionDirective = Some(d)
+          d
+      }
+    }
+
+    def setOut(t: Try[Out]): Unit =
+      out = t
+
+    override def apply(t: Try[Out]): Unit = {
+      setOut(t)
+      callback.invoke(this)
+    }
+  }
+}
+
+private[stream] class MapAsyncPartitioned[In, Out, Partition](
+    orderedOutput: Boolean,
+    parallelism: Int,
+    extractPartition: In => Partition,
+    f: (In, Partition) => Future[Out]) extends GraphStage[FlowShape[In, Out]] {
+
+  if (parallelism < 1) throw new IllegalArgumentException("parallelism must be 
at least 1")

Review Comment:
   Use `require`?



##########
stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala:
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream
+
+import scala.collection.mutable
+import scala.concurrent.Future
+import scala.util.control.{ NoStackTrace, NonFatal }
+import scala.util.{ Failure, Success, Try }
+
+import org.apache.pekko
+import pekko.dispatch.ExecutionContexts
+import pekko.stream.ActorAttributes.SupervisionStrategy
+import pekko.stream.Attributes.{ Name, SourceLocation }
+import pekko.stream.MapAsyncPartitioned._
+import pekko.stream.scaladsl.{ Flow, FlowWithContext, Source, 
SourceWithContext }
+import pekko.stream.stage._
+
+private[stream] object MapAsyncPartitioned {
+
+  private def extractPartitionWithCtx[In, Ctx, Partition](extract: In => 
Partition)(tuple: (In, Ctx)): Partition =
+    extract(tuple._1)
+
+  private def fWithCtx[In, Out, Ctx, Partition](f: (In, Partition) => 
Future[Out])(tuple: (In, Ctx),
+      partition: Partition): Future[(Out, Ctx)] =
+    f(tuple._1, partition).map(_ -> tuple._2)(ExecutionContexts.parasitic)
+
+  def mapSourceOrdered[In, Out, Partition, Mat](source: Source[In, Mat], 
parallelism: Int)(
+      extractPartition: In => Partition)(
+      f: (In, Partition) => Future[Out]): Source[Out, Mat] =
+    source.via(new MapAsyncPartitioned[In, Out, Partition](orderedOutput = 
true, parallelism, extractPartition, f))
+
+  def mapSourceUnordered[In, Out, Partition, Mat](source: Source[In, Mat], 
parallelism: Int)(
+      extractPartition: In => Partition)(
+      f: (In, Partition) => Future[Out]): Source[Out, Mat] =
+    source.via(new MapAsyncPartitioned[In, Out, Partition](orderedOutput = 
false, parallelism, extractPartition, f))
+
+  def mapSourceWithContextOrdered[In, Ctx, T, Partition, Mat](flow: 
SourceWithContext[In, Ctx, Mat], parallelism: Int)(
+      extractPartition: In => Partition)(
+      f: (In, Partition) => Future[T]): SourceWithContext[T, Ctx, Mat] =
+    flow.via(
+      new MapAsyncPartitioned[(In, Ctx), (T, Ctx), Partition](
+        orderedOutput = true,
+        parallelism,
+        extractPartitionWithCtx(extractPartition),
+        fWithCtx[In, T, Ctx, Partition](f)))
+
+  def mapSourceWithContextUnordered[In, Ctx, T, Partition, Mat](flow: 
SourceWithContext[In, Ctx, Mat],
+      parallelism: Int)(extractPartition: In => Partition)(
+      f: (In, Partition) => Future[T]): SourceWithContext[T, Ctx, Mat] =
+    flow.via(
+      new MapAsyncPartitioned[(In, Ctx), (T, Ctx), Partition](
+        orderedOutput = false,
+        parallelism,
+        extractPartitionWithCtx(extractPartition),
+        fWithCtx[In, T, Ctx, Partition](f)))
+
+  def mapFlowOrdered[In, Out, T, Partition, Mat](flow: Flow[In, Out, Mat], 
parallelism: Int)(
+      extractPartition: Out => Partition)(
+      f: (Out, Partition) => Future[T]): Flow[In, T, Mat] =
+    flow.via(new MapAsyncPartitioned[Out, T, Partition](orderedOutput = true, 
parallelism, extractPartition,
+      f))
+
+  def mapFlowUnordered[In, Out, T, Partition, Mat](flow: Flow[In, Out, Mat], 
parallelism: Int)(
+      extractPartition: Out => Partition)(
+      f: (Out, Partition) => Future[T]): Flow[In, T, Mat] =
+    flow.via(new MapAsyncPartitioned[Out, T, Partition](orderedOutput = false, 
parallelism,
+      extractPartition, f))
+
+  def mapFlowWithContextOrdered[In, Out, CtxIn, CtxOut, T, Partition, Mat](
+      flow: FlowWithContext[In, CtxIn, Out, CtxOut, Mat], parallelism: Int)(
+      extractPartition: Out => Partition)(
+      f: (Out, Partition) => Future[T]): FlowWithContext[In, CtxIn, T, CtxOut, 
Mat] =
+    flow.via(
+      new MapAsyncPartitioned[(Out, CtxOut), (T, CtxOut), Partition](
+        orderedOutput = true,
+        parallelism,
+        extractPartitionWithCtx(extractPartition),
+        fWithCtx[Out, T, CtxOut, Partition](f)))
+
+  def mapFlowWithContextUnordered[In, Out, CtxIn, CtxOut, T, Partition, Mat](
+      flow: FlowWithContext[In, CtxIn, Out, CtxOut, Mat], parallelism: 
Int)(extractPartition: Out => Partition)(
+      f: (Out, Partition) => Future[T]): FlowWithContext[In, CtxIn, T, CtxOut, 
Mat] =
+    flow.via(
+      new MapAsyncPartitioned[(Out, CtxOut), (T, CtxOut), Partition](
+        orderedOutput = false,
+        parallelism,
+        extractPartitionWithCtx(extractPartition),
+        fWithCtx[Out, T, CtxOut, Partition](f)))
+
+  private[stream] val NotYetThere: Failure[Nothing] = Failure(new Exception 
with NoStackTrace)
+
+  private[stream] final class Holder[In, Out](
+      val in: In,
+      var out: Try[Out],
+      callback: AsyncCallback[Holder[In, Out]]) extends (Try[Out] => Unit) {
+
+    // To support both fail-fast when the supervision directive is Stop
+    // and not calling the decider multiple times (#23888) we need to cache 
the decider result and re-use that
+    private var cachedSupervisionDirective: Option[Supervision.Directive] = 
None
+
+    def supervisionDirectiveFor(decider: Supervision.Decider, ex: Throwable): 
Supervision.Directive = {
+      cachedSupervisionDirective match {
+        case Some(d) => d
+        case _ =>
+          val d = decider(ex)
+          cachedSupervisionDirective = Some(d)
+          d
+      }
+    }
+
+    def setOut(t: Try[Out]): Unit =
+      out = t
+
+    override def apply(t: Try[Out]): Unit = {
+      setOut(t)
+      callback.invoke(this)
+    }
+  }
+}
+
+private[stream] class MapAsyncPartitioned[In, Out, Partition](
+    orderedOutput: Boolean,
+    parallelism: Int,
+    extractPartition: In => Partition,
+    f: (In, Partition) => Future[Out]) extends GraphStage[FlowShape[In, Out]] {
+
+  if (parallelism < 1) throw new IllegalArgumentException("parallelism must be 
at least 1")
+
+  private val in = Inlet[In]("MapAsyncPartitionOrdered.in")
+  private val out = Outlet[Out]("MapAsyncPartitionOrdered.out")
+
+  override val shape: FlowShape[In, Out] = FlowShape(in, out)
+
+  override def initialAttributes: Attributes =
+    Attributes(Name("MapAsyncPartitionOrdered")) and 
SourceLocation.forLambda(f)

Review Comment:
   Move this to `DefaultAttributes`



##########
stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala:
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream
+
+import scala.collection.mutable
+import scala.concurrent.Future
+import scala.util.control.{ NoStackTrace, NonFatal }
+import scala.util.{ Failure, Success, Try }
+
+import org.apache.pekko
+import pekko.dispatch.ExecutionContexts
+import pekko.stream.ActorAttributes.SupervisionStrategy
+import pekko.stream.Attributes.{ Name, SourceLocation }
+import pekko.stream.MapAsyncPartitioned._
+import pekko.stream.scaladsl.{ Flow, FlowWithContext, Source, 
SourceWithContext }
+import pekko.stream.stage._
+
+private[stream] object MapAsyncPartitioned {
+
+  private def extractPartitionWithCtx[In, Ctx, Partition](extract: In => 
Partition)(tuple: (In, Ctx)): Partition =
+    extract(tuple._1)
+
+  private def fWithCtx[In, Out, Ctx, Partition](f: (In, Partition) => 
Future[Out])(tuple: (In, Ctx),
+      partition: Partition): Future[(Out, Ctx)] =
+    f(tuple._1, partition).map(_ -> tuple._2)(ExecutionContexts.parasitic)
+
+  def mapSourceOrdered[In, Out, Partition, Mat](source: Source[In, Mat], 
parallelism: Int)(
+      extractPartition: In => Partition)(
+      f: (In, Partition) => Future[Out]): Source[Out, Mat] =
+    source.via(new MapAsyncPartitioned[In, Out, Partition](orderedOutput = 
true, parallelism, extractPartition, f))
+
+  def mapSourceUnordered[In, Out, Partition, Mat](source: Source[In, Mat], 
parallelism: Int)(
+      extractPartition: In => Partition)(
+      f: (In, Partition) => Future[Out]): Source[Out, Mat] =
+    source.via(new MapAsyncPartitioned[In, Out, Partition](orderedOutput = 
false, parallelism, extractPartition, f))
+
+  def mapSourceWithContextOrdered[In, Ctx, T, Partition, Mat](flow: 
SourceWithContext[In, Ctx, Mat], parallelism: Int)(
+      extractPartition: In => Partition)(
+      f: (In, Partition) => Future[T]): SourceWithContext[T, Ctx, Mat] =
+    flow.via(
+      new MapAsyncPartitioned[(In, Ctx), (T, Ctx), Partition](
+        orderedOutput = true,
+        parallelism,
+        extractPartitionWithCtx(extractPartition),
+        fWithCtx[In, T, Ctx, Partition](f)))
+
+  def mapSourceWithContextUnordered[In, Ctx, T, Partition, Mat](flow: 
SourceWithContext[In, Ctx, Mat],
+      parallelism: Int)(extractPartition: In => Partition)(
+      f: (In, Partition) => Future[T]): SourceWithContext[T, Ctx, Mat] =
+    flow.via(
+      new MapAsyncPartitioned[(In, Ctx), (T, Ctx), Partition](
+        orderedOutput = false,
+        parallelism,
+        extractPartitionWithCtx(extractPartition),
+        fWithCtx[In, T, Ctx, Partition](f)))
+
+  def mapFlowOrdered[In, Out, T, Partition, Mat](flow: Flow[In, Out, Mat], 
parallelism: Int)(
+      extractPartition: Out => Partition)(
+      f: (Out, Partition) => Future[T]): Flow[In, T, Mat] =
+    flow.via(new MapAsyncPartitioned[Out, T, Partition](orderedOutput = true, 
parallelism, extractPartition,
+      f))
+
+  def mapFlowUnordered[In, Out, T, Partition, Mat](flow: Flow[In, Out, Mat], 
parallelism: Int)(
+      extractPartition: Out => Partition)(
+      f: (Out, Partition) => Future[T]): Flow[In, T, Mat] =
+    flow.via(new MapAsyncPartitioned[Out, T, Partition](orderedOutput = false, 
parallelism,
+      extractPartition, f))
+
+  def mapFlowWithContextOrdered[In, Out, CtxIn, CtxOut, T, Partition, Mat](
+      flow: FlowWithContext[In, CtxIn, Out, CtxOut, Mat], parallelism: Int)(
+      extractPartition: Out => Partition)(
+      f: (Out, Partition) => Future[T]): FlowWithContext[In, CtxIn, T, CtxOut, 
Mat] =
+    flow.via(
+      new MapAsyncPartitioned[(Out, CtxOut), (T, CtxOut), Partition](
+        orderedOutput = true,
+        parallelism,
+        extractPartitionWithCtx(extractPartition),
+        fWithCtx[Out, T, CtxOut, Partition](f)))
+
+  def mapFlowWithContextUnordered[In, Out, CtxIn, CtxOut, T, Partition, Mat](
+      flow: FlowWithContext[In, CtxIn, Out, CtxOut, Mat], parallelism: 
Int)(extractPartition: Out => Partition)(
+      f: (Out, Partition) => Future[T]): FlowWithContext[In, CtxIn, T, CtxOut, 
Mat] =
+    flow.via(
+      new MapAsyncPartitioned[(Out, CtxOut), (T, CtxOut), Partition](
+        orderedOutput = false,
+        parallelism,
+        extractPartitionWithCtx(extractPartition),
+        fWithCtx[Out, T, CtxOut, Partition](f)))
+
+  private[stream] val NotYetThere: Failure[Nothing] = Failure(new Exception 
with NoStackTrace)
+
+  private[stream] final class Holder[In, Out](
+      val in: In,
+      var out: Try[Out],
+      callback: AsyncCallback[Holder[In, Out]]) extends (Try[Out] => Unit) {
+
+    // To support both fail-fast when the supervision directive is Stop
+    // and not calling the decider multiple times (#23888) we need to cache 
the decider result and re-use that
+    private var cachedSupervisionDirective: Option[Supervision.Directive] = 
None
+
+    def supervisionDirectiveFor(decider: Supervision.Decider, ex: Throwable): 
Supervision.Directive = {
+      cachedSupervisionDirective match {
+        case Some(d) => d
+        case _ =>
+          val d = decider(ex)
+          cachedSupervisionDirective = Some(d)
+          d
+      }
+    }
+
+    def setOut(t: Try[Out]): Unit =
+      out = t
+
+    override def apply(t: Try[Out]): Unit = {
+      setOut(t)
+      callback.invoke(this)
+    }
+  }
+}
+
+private[stream] class MapAsyncPartitioned[In, Out, Partition](
+    orderedOutput: Boolean,
+    parallelism: Int,
+    extractPartition: In => Partition,
+    f: (In, Partition) => Future[Out]) extends GraphStage[FlowShape[In, Out]] {
+
+  if (parallelism < 1) throw new IllegalArgumentException("parallelism must be 
at least 1")
+
+  private val in = Inlet[In]("MapAsyncPartitionOrdered.in")
+  private val out = Outlet[Out]("MapAsyncPartitionOrdered.out")
+
+  override val shape: FlowShape[In, Out] = FlowShape(in, out)
+
+  override def initialAttributes: Attributes =
+    Attributes(Name("MapAsyncPartitionOrdered")) and 
SourceLocation.forLambda(f)
+
+  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+    new GraphStageLogic(shape) with InHandler with OutHandler {
+      private val contextPropagation = pekko.stream.impl.ContextPropagation()
+
+      private final class Contextual[T](context: AnyRef, val element: T) {
+        private var suspended = false
+
+        def suspend(): Unit =
+          if (!suspended) {
+            suspended = true
+            contextPropagation.suspendContext()
+          }
+
+        def resume(): Unit =
+          if (suspended) {
+            suspended = false
+            contextPropagation.resumeContext(context)
+          }
+
+      }
+
+      private lazy val decider = 
inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
+
+      private var partitionsInProgress: mutable.Set[Partition] = _
+      private var buffer: mutable.Queue[(Partition, Contextual[Holder[In, 
Out]])] = _

Review Comment:
   use `org.apache.pekko.stream.impl.Buffer` like mapAsync?



##########
stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala:
##########
@@ -163,6 +163,36 @@ final class Flow[-In, +Out, +Mat](
   override def mapMaterializedValue[Mat2](f: Mat => Mat2): ReprMat[Out, Mat2] =
     new Flow(traversalBuilder.transformMat(f), shape)
 
+  /**
+   * Transforms this stream. Works very similarly to [[#mapAsync]] but with an 
additional
+   * partition step before the transform step. The transform function receives 
the an individual
+   * stream entry and the calculated partition value for that entry.
+   *
+   * @since 1.1.0
+   * @see [[#mapAsync]]
+   * @see [[#mapAsyncPartitionedUnordered]]
+   */

Review Comment:
   This scala doc meed explain the behavior in more detail with the 
backpressure sementic.



##########
stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala:
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream
+
+import scala.collection.mutable
+import scala.concurrent.Future
+import scala.util.control.{ NoStackTrace, NonFatal }
+import scala.util.{ Failure, Success, Try }
+
+import org.apache.pekko
+import pekko.dispatch.ExecutionContexts
+import pekko.stream.ActorAttributes.SupervisionStrategy
+import pekko.stream.Attributes.{ Name, SourceLocation }
+import pekko.stream.MapAsyncPartitioned._
+import pekko.stream.scaladsl.{ Flow, FlowWithContext, Source, 
SourceWithContext }
+import pekko.stream.stage._
+
+private[stream] object MapAsyncPartitioned {
+
+  private def extractPartitionWithCtx[In, Ctx, Partition](extract: In => 
Partition)(tuple: (In, Ctx)): Partition =
+    extract(tuple._1)
+
+  private def fWithCtx[In, Out, Ctx, Partition](f: (In, Partition) => 
Future[Out])(tuple: (In, Ctx),
+      partition: Partition): Future[(Out, Ctx)] =
+    f(tuple._1, partition).map(_ -> tuple._2)(ExecutionContexts.parasitic)

Review Comment:
   a better name than `fwithCtx`



##########
stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala:
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream
+
+import scala.collection.mutable
+import scala.concurrent.Future
+import scala.util.control.{ NoStackTrace, NonFatal }
+import scala.util.{ Failure, Success, Try }
+
+import org.apache.pekko
+import pekko.dispatch.ExecutionContexts
+import pekko.stream.ActorAttributes.SupervisionStrategy
+import pekko.stream.Attributes.{ Name, SourceLocation }
+import pekko.stream.MapAsyncPartitioned._
+import pekko.stream.scaladsl.{ Flow, FlowWithContext, Source, 
SourceWithContext }
+import pekko.stream.stage._
+
+private[stream] object MapAsyncPartitioned {
+
+  private def extractPartitionWithCtx[In, Ctx, Partition](extract: In => 
Partition)(tuple: (In, Ctx)): Partition =
+    extract(tuple._1)

Review Comment:
   tuple is not a good name, use `indexWithCtx`?



##########
stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala:
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream
+
+import scala.collection.mutable
+import scala.concurrent.Future
+import scala.util.control.{ NoStackTrace, NonFatal }
+import scala.util.{ Failure, Success, Try }
+
+import org.apache.pekko
+import pekko.dispatch.ExecutionContexts
+import pekko.stream.ActorAttributes.SupervisionStrategy
+import pekko.stream.Attributes.{ Name, SourceLocation }
+import pekko.stream.MapAsyncPartitioned._
+import pekko.stream.scaladsl.{ Flow, FlowWithContext, Source, 
SourceWithContext }
+import pekko.stream.stage._
+
+private[stream] object MapAsyncPartitioned {
+
+  private def extractPartitionWithCtx[In, Ctx, Partition](extract: In => 
Partition)(tuple: (In, Ctx)): Partition =
+    extract(tuple._1)
+
+  private def fWithCtx[In, Out, Ctx, Partition](f: (In, Partition) => 
Future[Out])(tuple: (In, Ctx),
+      partition: Partition): Future[(Out, Ctx)] =
+    f(tuple._1, partition).map(_ -> tuple._2)(ExecutionContexts.parasitic)
+
+  def mapSourceOrdered[In, Out, Partition, Mat](source: Source[In, Mat], 
parallelism: Int)(
+      extractPartition: In => Partition)(
+      f: (In, Partition) => Future[Out]): Source[Out, Mat] =
+    source.via(new MapAsyncPartitioned[In, Out, Partition](orderedOutput = 
true, parallelism, extractPartition, f))
+
+  def mapSourceUnordered[In, Out, Partition, Mat](source: Source[In, Mat], 
parallelism: Int)(
+      extractPartition: In => Partition)(
+      f: (In, Partition) => Future[Out]): Source[Out, Mat] =
+    source.via(new MapAsyncPartitioned[In, Out, Partition](orderedOutput = 
false, parallelism, extractPartition, f))
+
+  def mapSourceWithContextOrdered[In, Ctx, T, Partition, Mat](flow: 
SourceWithContext[In, Ctx, Mat], parallelism: Int)(
+      extractPartition: In => Partition)(
+      f: (In, Partition) => Future[T]): SourceWithContext[T, Ctx, Mat] =
+    flow.via(
+      new MapAsyncPartitioned[(In, Ctx), (T, Ctx), Partition](
+        orderedOutput = true,
+        parallelism,
+        extractPartitionWithCtx(extractPartition),
+        fWithCtx[In, T, Ctx, Partition](f)))
+
+  def mapSourceWithContextUnordered[In, Ctx, T, Partition, Mat](flow: 
SourceWithContext[In, Ctx, Mat],
+      parallelism: Int)(extractPartition: In => Partition)(
+      f: (In, Partition) => Future[T]): SourceWithContext[T, Ctx, Mat] =
+    flow.via(
+      new MapAsyncPartitioned[(In, Ctx), (T, Ctx), Partition](
+        orderedOutput = false,
+        parallelism,
+        extractPartitionWithCtx(extractPartition),
+        fWithCtx[In, T, Ctx, Partition](f)))
+
+  def mapFlowOrdered[In, Out, T, Partition, Mat](flow: Flow[In, Out, Mat], 
parallelism: Int)(
+      extractPartition: Out => Partition)(
+      f: (Out, Partition) => Future[T]): Flow[In, T, Mat] =
+    flow.via(new MapAsyncPartitioned[Out, T, Partition](orderedOutput = true, 
parallelism, extractPartition,
+      f))
+
+  def mapFlowUnordered[In, Out, T, Partition, Mat](flow: Flow[In, Out, Mat], 
parallelism: Int)(
+      extractPartition: Out => Partition)(
+      f: (Out, Partition) => Future[T]): Flow[In, T, Mat] =
+    flow.via(new MapAsyncPartitioned[Out, T, Partition](orderedOutput = false, 
parallelism,
+      extractPartition, f))
+
+  def mapFlowWithContextOrdered[In, Out, CtxIn, CtxOut, T, Partition, Mat](
+      flow: FlowWithContext[In, CtxIn, Out, CtxOut, Mat], parallelism: Int)(
+      extractPartition: Out => Partition)(
+      f: (Out, Partition) => Future[T]): FlowWithContext[In, CtxIn, T, CtxOut, 
Mat] =
+    flow.via(
+      new MapAsyncPartitioned[(Out, CtxOut), (T, CtxOut), Partition](
+        orderedOutput = true,
+        parallelism,
+        extractPartitionWithCtx(extractPartition),
+        fWithCtx[Out, T, CtxOut, Partition](f)))
+
+  def mapFlowWithContextUnordered[In, Out, CtxIn, CtxOut, T, Partition, Mat](
+      flow: FlowWithContext[In, CtxIn, Out, CtxOut, Mat], parallelism: 
Int)(extractPartition: Out => Partition)(
+      f: (Out, Partition) => Future[T]): FlowWithContext[In, CtxIn, T, CtxOut, 
Mat] =
+    flow.via(
+      new MapAsyncPartitioned[(Out, CtxOut), (T, CtxOut), Partition](
+        orderedOutput = false,
+        parallelism,
+        extractPartitionWithCtx(extractPartition),
+        fWithCtx[Out, T, CtxOut, Partition](f)))
+
+  private[stream] val NotYetThere: Failure[Nothing] = Failure(new Exception 
with NoStackTrace)
+
+  private[stream] final class Holder[In, Out](
+      val in: In,
+      var out: Try[Out],
+      callback: AsyncCallback[Holder[In, Out]]) extends (Try[Out] => Unit) {
+
+    // To support both fail-fast when the supervision directive is Stop
+    // and not calling the decider multiple times (#23888) we need to cache 
the decider result and re-use that
+    private var cachedSupervisionDirective: Option[Supervision.Directive] = 
None
+
+    def supervisionDirectiveFor(decider: Supervision.Decider, ex: Throwable): 
Supervision.Directive = {
+      cachedSupervisionDirective match {
+        case Some(d) => d
+        case _ =>
+          val d = decider(ex)
+          cachedSupervisionDirective = Some(d)
+          d
+      }
+    }
+
+    def setOut(t: Try[Out]): Unit =

Review Comment:
   set `elem` as the `MapAsync`?



##########
stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala:
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream
+
+import scala.collection.mutable
+import scala.concurrent.Future
+import scala.util.control.{ NoStackTrace, NonFatal }
+import scala.util.{ Failure, Success, Try }
+
+import org.apache.pekko
+import pekko.dispatch.ExecutionContexts
+import pekko.stream.ActorAttributes.SupervisionStrategy
+import pekko.stream.Attributes.{ Name, SourceLocation }
+import pekko.stream.MapAsyncPartitioned._
+import pekko.stream.scaladsl.{ Flow, FlowWithContext, Source, 
SourceWithContext }
+import pekko.stream.stage._
+
+private[stream] object MapAsyncPartitioned {
+
+  private def extractPartitionWithCtx[In, Ctx, Partition](extract: In => 
Partition)(tuple: (In, Ctx)): Partition =
+    extract(tuple._1)
+
+  private def fWithCtx[In, Out, Ctx, Partition](f: (In, Partition) => 
Future[Out])(tuple: (In, Ctx),
+      partition: Partition): Future[(Out, Ctx)] =
+    f(tuple._1, partition).map(_ -> tuple._2)(ExecutionContexts.parasitic)
+
+  def mapSourceOrdered[In, Out, Partition, Mat](source: Source[In, Mat], 
parallelism: Int)(
+      extractPartition: In => Partition)(
+      f: (In, Partition) => Future[Out]): Source[Out, Mat] =
+    source.via(new MapAsyncPartitioned[In, Out, Partition](orderedOutput = 
true, parallelism, extractPartition, f))
+
+  def mapSourceUnordered[In, Out, Partition, Mat](source: Source[In, Mat], 
parallelism: Int)(
+      extractPartition: In => Partition)(
+      f: (In, Partition) => Future[Out]): Source[Out, Mat] =
+    source.via(new MapAsyncPartitioned[In, Out, Partition](orderedOutput = 
false, parallelism, extractPartition, f))
+
+  def mapSourceWithContextOrdered[In, Ctx, T, Partition, Mat](flow: 
SourceWithContext[In, Ctx, Mat], parallelism: Int)(
+      extractPartition: In => Partition)(
+      f: (In, Partition) => Future[T]): SourceWithContext[T, Ctx, Mat] =
+    flow.via(
+      new MapAsyncPartitioned[(In, Ctx), (T, Ctx), Partition](
+        orderedOutput = true,
+        parallelism,
+        extractPartitionWithCtx(extractPartition),
+        fWithCtx[In, T, Ctx, Partition](f)))
+
+  def mapSourceWithContextUnordered[In, Ctx, T, Partition, Mat](flow: 
SourceWithContext[In, Ctx, Mat],
+      parallelism: Int)(extractPartition: In => Partition)(
+      f: (In, Partition) => Future[T]): SourceWithContext[T, Ctx, Mat] =
+    flow.via(
+      new MapAsyncPartitioned[(In, Ctx), (T, Ctx), Partition](
+        orderedOutput = false,
+        parallelism,
+        extractPartitionWithCtx(extractPartition),
+        fWithCtx[In, T, Ctx, Partition](f)))
+
+  def mapFlowOrdered[In, Out, T, Partition, Mat](flow: Flow[In, Out, Mat], 
parallelism: Int)(
+      extractPartition: Out => Partition)(
+      f: (Out, Partition) => Future[T]): Flow[In, T, Mat] =
+    flow.via(new MapAsyncPartitioned[Out, T, Partition](orderedOutput = true, 
parallelism, extractPartition,
+      f))
+
+  def mapFlowUnordered[In, Out, T, Partition, Mat](flow: Flow[In, Out, Mat], 
parallelism: Int)(
+      extractPartition: Out => Partition)(
+      f: (Out, Partition) => Future[T]): Flow[In, T, Mat] =
+    flow.via(new MapAsyncPartitioned[Out, T, Partition](orderedOutput = false, 
parallelism,
+      extractPartition, f))
+
+  def mapFlowWithContextOrdered[In, Out, CtxIn, CtxOut, T, Partition, Mat](
+      flow: FlowWithContext[In, CtxIn, Out, CtxOut, Mat], parallelism: Int)(
+      extractPartition: Out => Partition)(
+      f: (Out, Partition) => Future[T]): FlowWithContext[In, CtxIn, T, CtxOut, 
Mat] =
+    flow.via(
+      new MapAsyncPartitioned[(Out, CtxOut), (T, CtxOut), Partition](
+        orderedOutput = true,
+        parallelism,
+        extractPartitionWithCtx(extractPartition),
+        fWithCtx[Out, T, CtxOut, Partition](f)))
+
+  def mapFlowWithContextUnordered[In, Out, CtxIn, CtxOut, T, Partition, Mat](
+      flow: FlowWithContext[In, CtxIn, Out, CtxOut, Mat], parallelism: 
Int)(extractPartition: Out => Partition)(
+      f: (Out, Partition) => Future[T]): FlowWithContext[In, CtxIn, T, CtxOut, 
Mat] =
+    flow.via(
+      new MapAsyncPartitioned[(Out, CtxOut), (T, CtxOut), Partition](
+        orderedOutput = false,
+        parallelism,
+        extractPartitionWithCtx(extractPartition),
+        fWithCtx[Out, T, CtxOut, Partition](f)))
+
+  private[stream] val NotYetThere: Failure[Nothing] = Failure(new Exception 
with NoStackTrace)
+
+  private[stream] final class Holder[In, Out](
+      val in: In,
+      var out: Try[Out],
+      callback: AsyncCallback[Holder[In, Out]]) extends (Try[Out] => Unit) {
+
+    // To support both fail-fast when the supervision directive is Stop
+    // and not calling the decider multiple times (#23888) we need to cache 
the decider result and re-use that
+    private var cachedSupervisionDirective: Option[Supervision.Directive] = 
None
+
+    def supervisionDirectiveFor(decider: Supervision.Decider, ex: Throwable): 
Supervision.Directive = {
+      cachedSupervisionDirective match {
+        case Some(d) => d
+        case _ =>
+          val d = decider(ex)
+          cachedSupervisionDirective = Some(d)
+          d
+      }
+    }
+
+    def setOut(t: Try[Out]): Unit =
+      out = t
+
+    override def apply(t: Try[Out]): Unit = {
+      setOut(t)
+      callback.invoke(this)
+    }
+  }
+}
+
+private[stream] class MapAsyncPartitioned[In, Out, Partition](
+    orderedOutput: Boolean,
+    parallelism: Int,
+    extractPartition: In => Partition,
+    f: (In, Partition) => Future[Out]) extends GraphStage[FlowShape[In, Out]] {
+
+  if (parallelism < 1) throw new IllegalArgumentException("parallelism must be 
at least 1")
+
+  private val in = Inlet[In]("MapAsyncPartitionOrdered.in")
+  private val out = Outlet[Out]("MapAsyncPartitionOrdered.out")
+
+  override val shape: FlowShape[In, Out] = FlowShape(in, out)
+
+  override def initialAttributes: Attributes =
+    Attributes(Name("MapAsyncPartitionOrdered")) and 
SourceLocation.forLambda(f)
+
+  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+    new GraphStageLogic(shape) with InHandler with OutHandler {
+      private val contextPropagation = pekko.stream.impl.ContextPropagation()
+
+      private final class Contextual[T](context: AnyRef, val element: T) {
+        private var suspended = false
+
+        def suspend(): Unit =
+          if (!suspended) {
+            suspended = true
+            contextPropagation.suspendContext()
+          }
+
+        def resume(): Unit =
+          if (suspended) {
+            suspended = false
+            contextPropagation.resumeContext(context)
+          }
+
+      }
+
+      private lazy val decider = 
inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
+
+      private var partitionsInProgress: mutable.Set[Partition] = _
+      private var buffer: mutable.Queue[(Partition, Contextual[Holder[In, 
Out]])] = _
+
+      private val futureCB = getAsyncCallback[Holder[In, Out]](holder =>
+        holder.out match {
+          case Success(_) => pushNextIfPossible()
+          case Failure(ex) =>
+            holder.supervisionDirectiveFor(decider, ex) match {
+              // fail fast as if supervision says so
+              case Supervision.Stop => failStage(ex)
+              case _                => pushNextIfPossible()
+            }
+        })
+
+      override def preStart(): Unit = {
+        partitionsInProgress = mutable.Set()
+        buffer = mutable.Queue()
+      }
+
+      override def onPull(): Unit =
+        pushNextIfPossible()
+
+      override def onPush(): Unit = {
+        try {
+          val element = grab(in)
+          val partition = extractPartition(element)
+
+          val wrappedInput = new Contextual(
+            contextPropagation.currentContext(),
+            new Holder[In, Out](element, NotYetThere, futureCB))
+
+          buffer.enqueue(partition -> wrappedInput)
+
+          if (canStartNextElement(partition)) {
+            processElement(partition, wrappedInput)
+          } else {
+            wrappedInput.suspend()
+          }
+        } catch {
+          case NonFatal(ex) => if (decider(ex) == Supervision.Stop) 
failStage(ex)
+        }
+
+        pullIfNeeded()
+      }
+
+      override def onUpstreamFinish(): Unit =
+        if (idle()) completeStage()
+
+      private def processElement(partition: Partition, wrappedInput: 
Contextual[Holder[In, Out]]): Unit = {
+        import wrappedInput.{ element => holder }
+        val future = f(holder.in, partition)
+
+        partitionsInProgress += partition
+
+        future.value match {
+          case None    => 
future.onComplete(holder)(ExecutionContexts.parasitic)
+          case Some(v) =>
+            // #20217 the future is already here, optimization: avoid 
scheduling it on the dispatcher and

Review Comment:
   There is no `#20217` in this repo



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to