Hi all, 

We are trying to create a stage which caters to 2 requirements - 

1) If the producer is faster than the consumer, then consumer should get the 
latest element always (dropping the intermediate elements). Similar to conflate 
<https://doc.akka.io/docs/akka/2.5/stream/stream-cookbook.html#working-with-rate>
 api. This was discussed here 
<https://groups.google.com/forum/#!searchin/akka-user/akka$20streams|sort:date/akka-user/5AUkfBkK2V4/qpp0dsnWAAAJ>.

2) If the producer is slower than the consumer, then consumer (who is pulling 
on a faster frequency) should get the latest element on the stream on each 
pull. (which means elements will be duplicated). Similar to expand 
<https://doc.akka.io/docs/akka/2.5.5/scala/stream/stream-rate.html#understanding-expand>
 api.

Since the conflate and expand stages both buffer elements which increases the 
latency, we are creating our own stage.
 
Looking for feedback on the code below.

import akka.stream.stage._
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}

import scala.concurrent.duration.FiniteDuration

class CustomThrottleStage[A](delay: FiniteDuration) extends 
GraphStage[FlowShape[A, A]] {
  final val in    = Inlet.create[A]("Throttle.in")
  final val out   = Outlet.create[A]("Throttle.out")
  final val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
new TimerGraphStageLogic(shape) {
    private var isPulled             = false
    private var maybeElem: Option[A] = None

    override def preStart(): Unit = {
      schedulePeriodically(None, delay)
      pull(in)
    }

    setHandler(
      in,
      new InHandler {
        override def onPush(): Unit = {
          //Whenever upstream pushes elements, store it and push it only on the 
timer.
          maybeElem = Some(grab(in))
          pull(in) //drop elements - required when the producer is faster
        }
      }
    )

    setHandler(
      out,
      new OutHandler {
        override def onPull(): Unit = {
          isPulled = true
        }
      }
    )

    override def onTimer(key: Any): Unit = {
      *//**on timer, push only if there is a demand from downstream*
      if (isPulled) {
        maybeElem.foreach { x =>
          isPulled = false
          push(out, x)
        }
      }
    }
  }
}



Regards,

Dolly




-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to