Re: [akka-user] [Akka-Streams] Want to always receive latest element in Sink

2018-02-23 Thread saloniv
Hello Arnout,

We are providing api for developers to use. So, there is a callback written 
by other developers which will be executed instead of Thread.sleep in 
example. 

Hence, we cannot say whether they will write CPU intensive code or some 
kind of IO.

Hope this helps.

On Wednesday, February 7, 2018 at 6:26:11 PM UTC+5:30, Arnout Engelen wrote:
>
> Hello Saloni,
>
> I think it would be helpful to have a more realistic example than doing 
> "Thread.sleep(1000)" in the sink.
>
> Could we unpack what this sleep is intended to mimic in your 'real' 
> application? Is it for example doing CPU-intensive data parsing or perhaps 
> some kind of IO?
>
>
> Kind regards,
>
> Arnout
>
> On Thu, Jan 25, 2018 at 7:27 AM, > 
> wrote:
>
>> Hello,
>>
>> We are having a requirement that if a consumer is slower than producer 
>> then discard all the elements that cannot be consumed and whenever the 
>> consumer gets ready, feed the latest element from producer.
>>
>> We tried an approach as follows:
>>
>> Source.actorRef(0, OverflowStrategy.dropHead)   // actor receives 
>>> data at every 10 milliseconds
>>
>> .runWith {
>>>println("data received")
>>>Thread.sleep(1000)   // mimic consumer processing data in 
>>> every 1 second
>>> }
>>
>>
>> We shrank the buffer size to 1 (minimal possible) with following settings
>>
>> private val actorMaterializerSettings = ActorMaterializerSettings(
>>> actorSystem).withInputBuffer(1, 1)
>>
>>
>> With this buffer size, Sink pulls data 1 to consume and data 2 to put in 
>> buffer at initialization.
>>
>> While data 1 is getting processed we are dropping data from producer.
>>
>> When data 1 gets processed after 1000 milliseconds (1 second) ideally I 
>> should receive data 10 (and drop 2 - 9 as consumer is slow) but instead I 
>> receive data 2 from the buffer. data 2 in our domain is extremely useless 
>> as it is stale.
>>
>> Is there a way to disable buffer at Sink totally and always pull latest 
>> data from Source ?
>>
>>
>> -- 
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> -- 
> Arnout Engelen
> *Senior Software Engineer*
> E: arnout@lightbend.com 
> T: https://twitter.com/raboofje
>
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [Akka-Streams] Want to always receive latest element in Sink

2018-02-08 Thread Johan Andrén
You should be able to allow a faster upstream to continue, while emitting 
the latest value whenever downstream is read with conflate like so:

Source(0 to 1000) 
  .throttle(10, 1.second, 1, ThrottleMode.shaping) // fast upstream
  .conflate((in, prev) => in) // keep the latest value
  .throttle(2, 1.second, 1, ThrottleMode.shaping) // slow downstream 
  .runForeach(println)


--

Johan

Akka Team


On Wednesday, February 7, 2018 at 1:56:11 PM UTC+1, Arnout Engelen wrote:
>
> Hello Saloni,
>
> I think it would be helpful to have a more realistic example than doing 
> "Thread.sleep(1000)" in the sink.
>
> Could we unpack what this sleep is intended to mimic in your 'real' 
> application? Is it for example doing CPU-intensive data parsing or perhaps 
> some kind of IO?
>
>
> Kind regards,
>
> Arnout
>
> On Thu, Jan 25, 2018 at 7:27 AM,  wrote:
>
>> Hello,
>>
>> We are having a requirement that if a consumer is slower than producer 
>> then discard all the elements that cannot be consumed and whenever the 
>> consumer gets ready, feed the latest element from producer.
>>
>> We tried an approach as follows:
>>
>> Source.actorRef(0, OverflowStrategy.dropHead)   // actor receives 
>>> data at every 10 milliseconds
>>
>> .runWith {
>>>println("data received")
>>>Thread.sleep(1000)   // mimic consumer processing data in 
>>> every 1 second
>>> }
>>
>>
>> We shrank the buffer size to 1 (minimal possible) with following settings
>>
>> private val actorMaterializerSettings = ActorMaterializerSettings(
>>> actorSystem).withInputBuffer(1, 1)
>>
>>
>> With this buffer size, Sink pulls data 1 to consume and data 2 to put in 
>> buffer at initialization.
>>
>> While data 1 is getting processed we are dropping data from producer.
>>
>> When data 1 gets processed after 1000 milliseconds (1 second) ideally I 
>> should receive data 10 (and drop 2 - 9 as consumer is slow) but instead I 
>> receive data 2 from the buffer. data 2 in our domain is extremely useless 
>> as it is stale.
>>
>> Is there a way to disable buffer at Sink totally and always pull latest 
>> data from Source ?
>>
>>
>> -- 
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> -- 
> Arnout Engelen
> *Senior Software Engineer*
> E: arnout.enge...@lightbend.com
> T: https://twitter.com/raboofje
>
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [Akka-Streams] Want to always receive latest element in Sink

2018-02-07 Thread Arnout Engelen
Hello Saloni,

I think it would be helpful to have a more realistic example than doing
"Thread.sleep(1000)" in the sink.

Could we unpack what this sleep is intended to mimic in your 'real'
application? Is it for example doing CPU-intensive data parsing or perhaps
some kind of IO?


Kind regards,

Arnout

On Thu, Jan 25, 2018 at 7:27 AM,  wrote:

> Hello,
>
> We are having a requirement that if a consumer is slower than producer
> then discard all the elements that cannot be consumed and whenever the
> consumer gets ready, feed the latest element from producer.
>
> We tried an approach as follows:
>
> Source.actorRef(0, OverflowStrategy.dropHead)   // actor receives data
>> at every 10 milliseconds
>
> .runWith {
>>println("data received")
>>Thread.sleep(1000)   // mimic consumer processing data in
>> every 1 second
>> }
>
>
> We shrank the buffer size to 1 (minimal possible) with following settings
>
> private val actorMaterializerSettings = ActorMaterializerSettings(acto
>> rSystem).withInputBuffer(1, 1)
>
>
> With this buffer size, Sink pulls data 1 to consume and data 2 to put in
> buffer at initialization.
>
> While data 1 is getting processed we are dropping data from producer.
>
> When data 1 gets processed after 1000 milliseconds (1 second) ideally I
> should receive data 10 (and drop 2 - 9 as consumer is slow) but instead I
> receive data 2 from the buffer. data 2 in our domain is extremely useless
> as it is stale.
>
> Is there a way to disable buffer at Sink totally and always pull latest
> data from Source ?
>
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Arnout Engelen
*Senior Software Engineer*
E: arnout.enge...@lightbend.com
T: https://twitter.com/raboofje

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.