GitHub user gaeljw added a comment to the discussion: [Pekko Streams] Best 
practice for streaming from a resource that can take several seconds/minutes to 
start offering an element

Thanks for your message @raboof .

Here's a (not so) minified version of the code I was using. However, _as often_ 
, now that I'm sharing this with you, I was able to make the async variant 
work. I'll need to review the more complex real code to see if I missed 
something and async does work or if there's something else preventing async to 
work.

---

**Sync version**

```scala
object PekkoMain {

  private val logger = org.slf4j.LoggerFactory.getLogger(this.getClass)

  def main(args: Array[String]): Unit = {
    logger.info("Hello, Pekko!")

    implicit val system = ActorSystem("PekkoSystem")
    implicit val ec = system.dispatcher

    val ks = KillSwitches.shared("kill-switch")

    // KillSwitch is not able to stop the flow
    val source: Source[String, _] = CustomSource.source()
    // KillSwitch does stop the flow
//    val source: Source[String, _] = 
Source.repeat("Repeat").initialDelay(15.seconds).delay(1.second).take(20)

    val done: Future[Done] = source
      .via(ks.flow)
      .watchTermination() { (_, f) =>
        f.onComplete { _ =>
          logger.info(s"onComplete")
        }
        NotUsed
      }
      .runForeach(logger.info)

    Future {
      Thread.sleep(10 * 1000)
      logger.info("Trigger KillSwitch")
      ks.shutdown()
    }

    Await.result(done, 60.seconds)

    Await.result(system.terminate(), 60.seconds)
    ()
  }

}

object CustomSource {

  private val logger = org.slf4j.LoggerFactory.getLogger(this.getClass)

  def source(): Source[String, Future[Int]] = Source.fromGraph(new 
ResultSource())

  private class ResultSource() extends 
GraphStageWithMaterializedValue[SourceShape[String], Future[Int]] {

    val out: Outlet[String] = Outlet(s"$toString.out")
    val shape: SourceShape[String] = SourceShape(out)

    override def createLogicAndMaterializedValue(inheritedAttributes: 
Attributes): (GraphStageLogic, Future[Int]) = {
      val result = Promise[Int]()

      val logic = new GraphStageLogic(shape) with OutHandler {

        private var cursor: Option[String] = None
        private var counter: Int = 0
        private var cancel = false

        override def postStop() = release()

        private def release(): Unit = {
          logger.info("Releasing resources")
        }

        private def nextCursor(): Unit = {
          if (cancel) {
            logger.info("nextCursor -> cancelled")
            throw new RuntimeException("Cancelled")
          }

          cursor = if (counter == 0) {
            logger.info("nextCursor -> Sleeping for 15 seconds")
            (1 to 15).foreach { _ =>
              if (!cancel) {
                logger.info("nextCursor -> Sleeping")
                Thread.sleep(1000)
              } else {
                logger.info("nextCursor -> cancelled")
                throw new RuntimeException("Cancelled")
              }
            }
            Some(s"Item $counter")
          } else if (counter < 50) {
            logger.info("nextCursor -> Immediate result")
            Some(s"Item $counter")
          } else {
            logger.info("nextCursor -> No more results")
            None
          }
        }

        def onPull(): Unit = {
          nextCursor()
          cursor match {
            case Some(c) =>
              counter += 1
              push(out, c)
            case _ =>
              result.success(counter)
              complete(out)
          }
        }

        @nowarn
        override def onDownstreamFinish() = {
          logger.info("onDownstreamFinish")
          result.tryFailure(new InterruptedException("Downstream finished"))
          release()
          cancel = true
          super.onDownstreamFinish(): @nowarn
        }

        setHandler(out, this)
      }

      logic -> result.future
    }
  }

}
```

Gives this output:
```
2024-12-06 18:46:48,386 INFO  main PekkoMain$ - Hello, Pekko!
2024-12-06 18:46:49,333 INFO  PekkoSystem-pekko.actor.default-dispatcher-4 
o.a.p.e.s.Slf4jLogger - Slf4jLogger started
2024-12-06 18:46:49,807 INFO  PekkoSystem-pekko.actor.default-dispatcher-5 
CustomSource$ - nextCursor -> Sleeping for 15 seconds
2024-12-06 18:46:49,807 INFO  PekkoSystem-pekko.actor.default-dispatcher-5 
CustomSource$ - nextCursor -> Sleeping
2024-12-06 18:46:50,807 INFO  PekkoSystem-pekko.actor.default-dispatcher-5 
CustomSource$ - nextCursor -> Sleeping
2024-12-06 18:46:51,807 INFO  PekkoSystem-pekko.actor.default-dispatcher-5 
CustomSource$ - nextCursor -> Sleeping
2024-12-06 18:46:52,808 INFO  PekkoSystem-pekko.actor.default-dispatcher-5 
CustomSource$ - nextCursor -> Sleeping
2024-12-06 18:46:53,808 INFO  PekkoSystem-pekko.actor.default-dispatcher-5 
CustomSource$ - nextCursor -> Sleeping
2024-12-06 18:46:54,808 INFO  PekkoSystem-pekko.actor.default-dispatcher-5 
CustomSource$ - nextCursor -> Sleeping
2024-12-06 18:46:55,809 INFO  PekkoSystem-pekko.actor.default-dispatcher-5 
CustomSource$ - nextCursor -> Sleeping
2024-12-06 18:46:56,809 INFO  PekkoSystem-pekko.actor.default-dispatcher-5 
CustomSource$ - nextCursor -> Sleeping
2024-12-06 18:46:57,809 INFO  PekkoSystem-pekko.actor.default-dispatcher-5 
CustomSource$ - nextCursor -> Sleeping
2024-12-06 18:46:58,809 INFO  PekkoSystem-pekko.actor.default-dispatcher-5 
CustomSource$ - nextCursor -> Sleeping
2024-12-06 18:46:59,803 INFO  PekkoSystem-pekko.actor.default-dispatcher-4 
PekkoMain$ - Trigger KillSwitch
2024-12-06 18:46:59,810 INFO  PekkoSystem-pekko.actor.default-dispatcher-5 
CustomSource$ - nextCursor -> Sleeping
2024-12-06 18:47:00,810 INFO  PekkoSystem-pekko.actor.default-dispatcher-5 
CustomSource$ - nextCursor -> Sleeping
2024-12-06 18:47:01,810 INFO  PekkoSystem-pekko.actor.default-dispatcher-5 
CustomSource$ - nextCursor -> Sleeping
2024-12-06 18:47:02,826 INFO  PekkoSystem-pekko.actor.default-dispatcher-5 
CustomSource$ - nextCursor -> Sleeping
2024-12-06 18:47:03,827 INFO  PekkoSystem-pekko.actor.default-dispatcher-5 
CustomSource$ - nextCursor -> Sleeping
2024-12-06 18:47:04,828 INFO  PekkoSystem-pekko.actor.default-dispatcher-5 
PekkoMain$ - Item 0
2024-12-06 18:47:04,828 INFO  PekkoSystem-pekko.actor.default-dispatcher-5 
CustomSource$ - nextCursor -> Immediate result
2024-12-06 18:47:04,828 INFO  PekkoSystem-pekko.actor.default-dispatcher-5 
PekkoMain$ - Item 1
2024-12-06 18:47:04,828 INFO  PekkoSystem-pekko.actor.default-dispatcher-5 
CustomSource$ - nextCursor -> Immediate result
2024-12-06 18:47:04,829 INFO  PekkoSystem-pekko.actor.default-dispatcher-5 
PekkoMain$ - Item 2
2024-12-06 18:47:04,829 INFO  PekkoSystem-pekko.actor.default-dispatcher-5 
CustomSource$ - nextCursor -> Immediate result
2024-12-06 18:47:04,829 INFO  PekkoSystem-pekko.actor.default-dispatcher-5 
PekkoMain$ - Item 3
2024-12-06 18:47:04,829 INFO  PekkoSystem-pekko.actor.default-dispatcher-5 
CustomSource$ - nextCursor -> Immediate result
2024-12-06 18:47:04,829 INFO  PekkoSystem-pekko.actor.default-dispatcher-5 
PekkoMain$ - Item 4
2024-12-06 18:47:04,829 INFO  PekkoSystem-pekko.actor.default-dispatcher-5 
CustomSource$ - nextCursor -> Immediate result
2024-12-06 18:47:04,829 INFO  PekkoSystem-pekko.actor.default-dispatcher-5 
PekkoMain$ - Item 5
2024-12-06 18:47:04,829 INFO  PekkoSystem-pekko.actor.default-dispatcher-5 
CustomSource$ - nextCursor -> Immediate result
...
2024-12-06 18:47:04,838 INFO  PekkoSystem-pekko.actor.default-dispatcher-5 
CustomSource$ - nextCursor -> Immediate result
2024-12-06 18:47:04,838 INFO  PekkoSystem-pekko.actor.default-dispatcher-5 
PekkoMain$ - Item 48
2024-12-06 18:47:04,838 INFO  PekkoSystem-pekko.actor.default-dispatcher-5 
CustomSource$ - nextCursor -> Immediate result
2024-12-06 18:47:04,838 INFO  PekkoSystem-pekko.actor.default-dispatcher-5 
PekkoMain$ - Item 49
2024-12-06 18:47:04,838 INFO  PekkoSystem-pekko.actor.default-dispatcher-5 
CustomSource$ - nextCursor -> No more results
2024-12-06 18:47:04,839 INFO  PekkoSystem-pekko.actor.default-dispatcher-5 
CustomSource$ - Releasing resources
2024-12-06 18:47:04,843 INFO  PekkoSystem-pekko.actor.default-dispatcher-4 
PekkoMain$ - onComplete
2024-12-06 18:47:04,874 INFO  PekkoSystem-pekko.actor.default-dispatcher-4 
o.a.p.a.CoordinatedShutdown - Running CoordinatedShutdown with reason 
[ActorSystemTerminateReason]
```

---

**Async version**

```scala
object PekkoMain {

  private val logger = org.slf4j.LoggerFactory.getLogger(this.getClass)

  def main(args: Array[String]): Unit = {
    // Same as for sync with:
    val blockingEc = 
ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(4))
    val source: Source[String, _] = CustomSource.asyncSource(blockingEc)
  }

}

object CustomSource {

  private val logger = org.slf4j.LoggerFactory.getLogger(this.getClass)

  def asyncSource(blockingEc: ExecutionContext): Source[String, Future[Int]] = 
Source.fromGraph(new AsyncResultSource(blockingEc))

  private class AsyncResultSource(blockingEc: ExecutionContext) extends 
GraphStageWithMaterializedValue[SourceShape[String], Future[Int]] {

    val out: Outlet[String] = Outlet(s"$toString.out")
    val shape: SourceShape[String] = SourceShape(out)

    override def createLogicAndMaterializedValue(inheritedAttributes: 
Attributes): (GraphStageLogic, Future[Int]) = {
      val result = Promise[Int]()

      val logic = new GraphStageLogic(shape) with OutHandler {

        private var counter: Int = 0
        private var cancel = false

        val cb: AsyncCallback[Try[Option[String]]] = 
getAsyncCallback[Try[Option[String]]] { itemOptTry =>
          itemOptTry match {
            case Success(itemOpt) =>
              itemOpt match {
                case Some(c) =>
                  counter += 1
                  push(out, c)
                case _ =>
                  result.success(counter)
                  complete(out)
              }
            case Failure(ex) => failStage(ex)
          }
        }

        override def postStop() = release()

        private def release(): Unit = {
          logger.info("Releasing resources")
        }

        private def nextCursor(): Unit = {
          if (cancel) {
            logger.info("nextCursor -> cancelled")
            throw new RuntimeException("Cancelled")
          }

          val f = Future {
            blocking {
              if (counter == 0) {
                logger.info("nextCursor -> Sleeping for 15 seconds")
                logger.info("nextCursor -> Sleeping")
                Thread.sleep(15000)
                Some(s"Item $counter")
              } else if (counter < 50) {
                logger.info("nextCursor -> Immediate result")
                Some(s"Item $counter")
              } else {
                logger.info("nextCursor -> No more results")
                None
              }
            }
          }(blockingEc)

          f.onComplete(cb.invoke)(ExecutionContext.parasitic)
        }

        def onPull(): Unit = {
          nextCursor()
        }

        @nowarn
        override def onDownstreamFinish() = {
          logger.info("onDownstreamFinish")
          result.tryFailure(new InterruptedException("Downstream finished"))
          release()
          cancel = true
          super.onDownstreamFinish(): @nowarn
        }

        setHandler(out, this)
      }

      logic -> result.future
    }
  }

}
```

Gives the following output:
```
2024-12-06 18:51:13,325 INFO  main PekkoMain$ - Hello, Pekko!
2024-12-06 18:51:14,372 INFO  PekkoSystem-pekko.actor.default-dispatcher-4 
o.a.p.e.s.Slf4jLogger - Slf4jLogger started
2024-12-06 18:51:14,828 INFO  pool-1-thread-1 CustomSource$ - nextCursor -> 
Sleeping for 15 seconds
2024-12-06 18:51:14,828 INFO  pool-1-thread-1 CustomSource$ - nextCursor -> 
Sleeping
2024-12-06 18:51:24,826 INFO  PekkoSystem-pekko.actor.default-dispatcher-4 
PekkoMain$ - Trigger KillSwitch
2024-12-06 18:51:24,845 INFO  PekkoSystem-pekko.actor.default-dispatcher-5 
CustomSource$ - onDownstreamFinish
2024-12-06 18:51:24,846 INFO  PekkoSystem-pekko.actor.default-dispatcher-5 
CustomSource$ - Releasing resources
2024-12-06 18:51:24,850 INFO  PekkoSystem-pekko.actor.default-dispatcher-5 
CustomSource$ - Releasing resources
2024-12-06 18:51:24,850 INFO  PekkoSystem-pekko.actor.default-dispatcher-4 
PekkoMain$ - onComplete
2024-12-06 18:51:24,884 INFO  PekkoSystem-pekko.actor.default-dispatcher-4 
o.a.p.a.CoordinatedShutdown - Running CoordinatedShutdown with reason 
[ActorSystemTerminateReason]
```


GitHub link: 
https://github.com/apache/pekko/discussions/1572#discussioncomment-11487721

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: 
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to