He-Pin commented on code in PR #931:
URL: https://github.com/apache/incubator-pekko/pull/931#discussion_r1451714263


##########
stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala:
##########
@@ -1082,7 +1082,47 @@ trait FlowOps[+Out, +Mat] {
    * @param onComplete a function that transforms the ongoing state into an 
optional output element
    */
   def statefulMap[S, T](create: () => S)(f: (S, Out) => (S, T), onComplete: S 
=> Option[T]): Repr[T] =
-    via(new StatefulMap[S, Out, T](create, f, onComplete))
+    via(new StatefulMap[S, Out, T](create, f, 
onComplete).withAttributes(DefaultAttributes.statefulMap))
+
+  /**
+   * Transform each stream element with the help of a resource.
+   *
+   * The resource creation function is invoked once when the stream is 
materialized and the returned resource is passed to
+   * the mapping function for mapping the first element. The mapping function 
returns a mapped element to emit
+   * downstream. The returned `T` MUST NOT be `null` as it is illegal as 
stream element - according to the Reactive Streams specification.
+   *
+   * The `close` function is called only once when the upstream or downstream 
finishes or fails. You can do some clean-up here,
+   * and if the returned value is not empty, it will be emitted to the 
downstream if available, otherwise the value will be dropped.
+   *
+   * Early completion can be done with combination of the [[takeWhile]] 
operator.
+   *
+   * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+   *
+   * You can configure the default dispatcher for this Source by changing the 
`akka.stream.materializer.blocking-io-dispatcher` or
+   * set it for a given Source by using [[ActorAttributes]].
+   *
+   * '''Emits when''' the mapping function returns an element and downstream 
is ready to consume it
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @tparam R the type of the resource
+   * @tparam T the type of the output elements
+   * @param create function that creates the resource
+   * @param f function that transforms the upstream element and the resource 
to output element
+   * @param close function that closes the resource, optionally outputting a 
last element
+   * @since 1.1.0
+   */
+  def mapWithResource[R, T](create: () => R)(f: (R, Out) => T, close: R => 
Option[T]): Repr[T] =

Review Comment:
   I see , that would be nice addition.
   1. directly support for Java's AutoCloseable.
   2. directly support for Scala's Using.
   
   But I think those two doesn't related to this PR, because the nice feature 
of this operator is "optional emit with the resource/final signal".
   
   But and what your suggestion can come up with another PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org
For additional commands, e-mail: notifications-h...@pekko.apache.org

Reply via email to