He-Pin commented on code in PR #982:
URL: https://github.com/apache/incubator-pekko/pull/982#discussion_r1459919341


##########
stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala:
##########
@@ -267,9 +266,8 @@ private[stream] object Collect {
         } catch {
           case NonFatal(ex) =>
             decider(ex) match {
-              case Supervision.Stop    => failStage(ex)
-              case Supervision.Resume  => if (!hasBeenPulled(in)) pull(in)

Review Comment:
   Why? the Map and Filter don't have this either.
   
   I would like to add that for in `MapConcat` operator which explain why a 
`!hasBeenPulled(in)` test is needed.
   
   I think the origin reason about why this `!hasBeenPulled(in)` test is 
because that method is a abstract class and so that exception can happen when 
onPush/onPull, so it have to be guard for extending, and reduce the sub class' 
burden. that's not the case here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to