[akka-user] Example for change in 2.0-M1 stream

2015-11-12 Thread tigerfoot
Hello,

I have a "switch" code shown below in Akka stream 1.0.  I'll be totally 
honest, I don't understand 100% of what's going on here--I reworked it from 
a sample found someplace.  Is there an example somewhere of what an 
equivalent structure would look like in 2.0-M1?

Thanks for any hints!

import FanOutShape._
class CommandShape(_init: Init[QM[Command]] = 
Name[QM[Command]]("CommandRouter")) extends FanOutShape[QM[Command]](_init) 
{
val outMsg  = newOutlet[QM[MessageCommand]]("message")
val outFallthru = newOutlet[QM[Command]]("fallthru")  // unknown message
protected override def construct(i: Init[QM[Command]]) = new CommandShape(i)
}

// This next bit is the Flow (FlexiRoute) that does the "smart fanout" 
based on the switch logic
// using the FanOutShape designed above.
case class CommandRouter() extends FlexiRoute[QM[Command], 
CommandShape](new CommandShape, Attributes.name("CommandRouter")) {
import FlexiRoute._

override def createRouteLogic(p: PortT) = new RouteLogic[QM[Command]] {
override def initialState = State[Any](DemandFromAll(p.outMsg, 
p.outFallthru)) { (ctx, _, element) =>
element.body match {
case c: MessageCommand => ctx.emit(p.outMsg){
val mc = element.body.asInstanceOf[MessageCommand]
element.copy(
metaTags = List(  // Add meaningful metatags to message for error output
mc.recipient.firstName,
mc.recipient.lastName
) ++ mc.recipient.contacts.map{case(k,v) => k+":"+v}.toList,
body = mc
)
}
// case other commands here...
case unknown=> ctx.emit(p.outFallthru)(element)
}
SameState
}

override def initialCompletionHandling = eagerClose
}
}

To use it in a flow I would do something like:

val commandSwitch  = builder.add(CommandRouter())

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: [akka-http] java completeWith mapping future

2015-11-12 Thread Benoit Guillon
Hi André,

Thanks for your reply, using Mapper works fine (a bit more verbose than the 
scala implementation)

For those who have the same issue: here is the solution:

private Future handleGetTask(RequestContext ctx){
return Patterns.ask(actorSystem.actorFor("myActor"), buildMessage(), 
1000)
.map(new Mapper(){
public RouteResult apply(Object o) {
return ctx.completeAs(Jackson.json(), o);
}
  }, ctx.executionContext());
}

Thanks

Benoit

Le mercredi 11 novembre 2015 16:08:15 UTC+1, André a écrit :
>
> Hi Benoit,
>
> you should take a look at 
> http://doc.akka.io/docs/akka/snapshot/java/futures.html#Future_is_a_Monad.
>
> Mapper should solve your Problem.
>
> Cheers
> André
>
> On Wednesday, November 11, 2015 at 12:01:05 PM UTC+1, Benoit Guillon wrote:
>>
>> Hi,
>>
>> I'm using akka http with Java DSL and I'm trying to handle a get request 
>> with an actor and map my actor's response to the HTTP response as follows:
>>
>> private Route getTasksRoute(){
>>return 
>> path("tasks").route(pathEndOrSingleSlash().route(get(this.handleWith(
>>   (ctx) -> ctx.completeWith(handleGetTask(ctx))
>>;
>> }
>>
>> private Future handleGetTask(RequestContext ctx){
>>return Patterns.ask(actorSystem.actorFor("myActor"), buildMessage(), 
>> 1000)
>>.map(o -> ctx.completeAs(Jackson.json(), o), 
>> ctx.executionContext());
>> }
>>
>> This code does not compile in Java 8 because the API enforces to use 
>> scala futures
>>
>> Error:(74, 17) java: method map in interface scala.concurrent.Future 
>> cannot be applied to given types;
>>   required: 
>> scala.Function1,scala.concurrent.ExecutionContext
>>   found: (o)->ctx.c[...]), o),scala.concurrent.ExecutionContext
>>   reason: cannot infer type-variable(s) S
>> (argument mismatch; scala.Function1 is not a functional interface
>>   multiple non-overriding abstract methods found in interface 
>> scala.Function1)
>>
>> I could not find any relevant code example illustrating this use case in 
>> java.
>>
>> Can you suggest a code pattern to illustrate how the completeWith method 
>> is intended to be used in Java 8 in conjonction with actors' ask pattern ?
>>
>> Thanks for your help.
>>
>> Benoit
>>
>>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka cluster failover- race condition

2015-11-12 Thread tomerneeraj
Hi, 

Node here means one VM.

We are using Akka cluster where each node in the cluster assigned to do 
specific task. If any node in the cluster goes down then MEMBER DOWN event 
comes up in the cluster. After catching this event other node start processing 
task assigned to the failure node 

This is where problem pops up. Other node shows down in the cluster because it 
does not provide response for cluster events and timeout occurs I.e Akka 
cluster consider it down whereas due to high load or GC events it does not 
provide response but actually it keeps on processing records at slow rate 

Now both node in cluster 1. Show down in Akka cluster event 2. New node which 
starts processing due to member down event, starts processing same set of 
records and hence race conditions starts occurring 

One way to think around it is that never let the node overloaded and in that 
case this event always comes up when the node is actually down and not shows as 
down due to response down for checking the availability of node in the cluster. 

But there are other scenarios also which can not be predicted in advance. We 
need to have some mechanism where it guarantees that if some node is down that 
is down in reality

Need expert group members advise on it how to resolve it or it needs to be 
looked in a different way

Regards
Neeraj

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] I get strange error with 2.0-M1

2015-11-12 Thread Richard Grossman
I send a request all is working but get this after 2 s
I've configured in application.conf
akka.http.server.idle-timeout = 2 s

Internal server error, sending 500 response
java.util.concurrent.TimeoutException: No elements passed in the last 2 
seconds.
at akka.stream.impl.Timers$IdleBidi$$anon$4.onTimer(Timers.scala:142)
at 
akka.stream.stage.TimerGraphStageLogic.akka$stream$stage$TimerGraphStageLogic$$onInternalTimer(GraphStage.scala:755)
at 
akka.stream.stage.TimerGraphStageLogic$$anonfun$akka$stream$stage$TimerGraphStageLogic$$getTimerAsyncCallback$1.apply(GraphStage.scala:745)
at 
akka.stream.stage.TimerGraphStageLogic$$anonfun$akka$stream$stage$TimerGraphStageLogic$$getTimerAsyncCallback$1.apply(GraphStage.scala:745)
at 
akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:355)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at 
akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:291)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke_aroundBody0(ActorCell.scala:487)
at akka.actor.ActorCell$AjcClosure1.run(ActorCell.scala:1)
at org.aspectj.runtime.reflect.JoinPointImpl.proceed(JoinPointImpl.java:149)
at 
akka.kamon.instrumentation.ActorCellInstrumentation$$anonfun$aroundBehaviourInvoke$1.apply(ActorCellInstrumentation.scala:62)
at kamon.trace.Tracer$.withContext(TracerModule.scala:57)
at 
akka.kamon.instrumentation.ActorCellInstrumentation.aroundBehaviourInvoke(ActorCellInstrumentation.scala:61)
at akka.actor.ActorCell.invoke(ActorCell.scala:483)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Cluster Sharding - watching for termination of sharded actors.

2015-11-12 Thread Marcin Sośnicki
Hi,

I have done some deeper digging into that and it seems that the issue is 
not the Terminated message not being received. The sharded actor keeps 
running on both nodes of the cluster! 
The shard region correctly resolves where to send message so it's hard to 
spot this as long as you're using only ShardRegion to communicate with it 
(in that case it will just sit idle on the node that it was supposed to be 
removed from). 
In other case, when you rely on things like DeathWatch for example, it 
won't work as  expected as the actor is still alive.

It looks that the issue lies with the Remember Entities feature that was 
added in 2.4. 
Depending on that setting, either Shard class (persistent-entities=off) or 
PersistentShard (persistent-entities=on)  is used. 

In a ShardRegion class there are 
var shards = Map.empty[ShardId, ActorRef]
var shardsByRef = Map.empty[ActorRef, ShardId]
properties. 
I assume that there are two of them, just to enable faster lookup, 
depending whether you're using ShardId or ActorRef as a key. But I also 
assume that they should be consistent (and they're not). 
What happens in the scenario when the bug manifests (using remember 
entities=on and therefore PersistentShard class) is that shards map 
property is updated only after Persistent Actor recovery. 
case ShardInitialized(shardId)   ⇒ initializeShard(shardId, 
sender())
This introduces a gap where our state is not consistent, shardsRef holds 
some entries that shards does not, and some decisions are made based on 
that inconsistent state. 
For example the sharded actors might not get stopped, when the recovery is 
not finished yet (but the actor is already started so it'll be up 
eventually).
case msg @ HandOff(shard) ⇒
  log.debug("HandOff shard [{}]", shard)
  if (shardBuffers.contains(shard)) {
shardBuffers -= shard
loggedFullBufferWarning = false
  }
  if (shards.contains(shard)) {
handingOff += shards(shard)
shards(shard) forward msg
  } else
sender() ! ShardStopped(shard)



Is there a reason why shards and shardsByRef are not updated atomically? 
For example, the getShard method only updates the shardsByRef property, 
whereas it seems natural to update the shards property in that place as 
well (as opposed to updating shards property only after the recovery is 
completed).

I hope that my post is not too confusing, it's easier to show the problem 
directly in the code rather than posting just some fragments in a post. 

Thanks a lot,
Marcin

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Cluster Sharding Proxy vs Cluster Client

2015-11-12 Thread Artyom Bolshakov
When should I use Cluster Sharding Proxy and when Cluster Client?

I have a backend with sharded actors and HTTP frontend which provide 
interface for them. 
The operations mostly fire-and-forget semantics, so HTTP frontend does not 
return any data to user, but just proxy notifications to sharded actors.
What is the best options to select for frontend to backend communication?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] akka sharding monitoring

2015-11-12 Thread Rodrigo Boavida
Hi all,

I'm currently starting to implement an Akka sharding based infrastructure 
and given the dynamic load balancing nature and recoverability, part of the 
monitoring process would be around monitoring the current shards and 
entities owned by each shard. 

Would be nice to know if there is anything similar to the cluster events 
that would help me monitor for sharding changing events like re-balancing 
of shards, and quantifiers of shards per node and entities per shard.


Many thanks in advance.

Tnks,
Rod

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka-stream] demand doesn't follow arguments for for grouped or take

2015-11-12 Thread Endre Varga
On Thu, Nov 12, 2015 at 7:09 PM, Simon Schäfer  wrote:

>
>
> On 11/12/2015 06:57 PM, Endre Varga wrote:
>
> Hi Simon,
>
> This documentation section explains this in more detail:
> http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0-M1/scala/stream-rate.html
>
> Ok, I didn't come to that point of the documentation so far. I'm going to
> have a look, thanks!
>
>
> Btw, Sink.head actually does not request more than one, however this is
> not transitive and cannot be. Grouped cannot possibly know how many
> elements a downstream ever needs, so it tries to prefetch and exploit
> concurrency.
>
> Why not? Doesn't it have to wait for a demand as all other components have
> to do too? I mean Sink.head has a demand of 1, shouldn't grouped get this
> demand and then forward a demand of 1*N, where N is the argument to grouped?
>

No, demand is not forwarded, that does not work in general, especially in
the face of graph enabled stages (how would merge work?). Demand is
strictly local to a connection. If grouped gets a demand of 1, then it
knows that it can only emit one, but that does not dictate how much it
tries to get from upstream. What if the sink is not a Sink.head but a
network transport that signals demand one-by-one once the bytes are written
to the network? In this case grouped would better off aggregating elements
in the background. Just because a stage demanded 1, it does not mean it
will not demand later more.

-Endre


>
> -Endre
>
> On Thu, Nov 12, 2015 at 6:39 PM, Simon Schäfer  wrote:
>
>> I just tried the following (on 2.0-M1):
>>
>> scala> Source(1 to 100).map{i ⇒ println(i);
>> i}.grouped(10).runWith(Sink.head)
>> 1
>> 2
>> 3
>> 4
>> 5
>> 6
>> 7
>> 8
>> 9
>> 10
>> 11
>> res10: scala.concurrent.Future[scala.collection.immutable.Seq[Int]] =
>> scala.concurrent.impl.Promise$DefaultPromise@2df79942
>> 12
>> 13
>> 14
>> 15
>> 16
>>
>> scala> 17
>> 18
>> 19
>> 20
>> 21
>> 22
>> 23
>> 24
>> res10.onComplete(println)
>> Success(Vector(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
>>
>> A similar behavior can be seen for the take function:
>>
>> scala> Source(1 to 100).map{i ⇒ println(i);
>> i}.take(10).runWith(Sink.fold(Vector[Int]())(_ :+ _))
>> ...
>>
>>
>> As one can see, the stream processes more elements than it has to. It is
>> not a problem for me, but I would like to know if this behavior is spec'ed
>> anywhere? Shouldn't the stream be able to say, just by looking at the
>> arguments for grouped, take and the sink, how many elements should be
>> produced?
>> --
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ:
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to a topic in the
> Google Groups "Akka User List" group.
> To unsubscribe from this topic, visit
> https://groups.google.com/d/topic/akka-user/m4V-AsScuak/unsubscribe.
> To unsubscribe from this group and all its topics, send an email to
> akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving 

Re: [akka-user] [akka-stream] demand doesn't follow arguments for for grouped or take

2015-11-12 Thread Endre Varga
Hi Simon,

This documentation section explains this in more detail:
http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0-M1/scala/stream-rate.html

Btw, Sink.head actually does not request more than one, however this is not
transitive and cannot be. Grouped cannot possibly know how many elements a
downstream ever needs, so it tries to prefetch and exploit concurrency.

-Endre

On Thu, Nov 12, 2015 at 6:39 PM, Simon Schäfer  wrote:

> I just tried the following (on 2.0-M1):
>
> scala> Source(1 to 100).map{i ⇒ println(i);
> i}.grouped(10).runWith(Sink.head)
> 1
> 2
> 3
> 4
> 5
> 6
> 7
> 8
> 9
> 10
> 11
> res10: scala.concurrent.Future[scala.collection.immutable.Seq[Int]] =
> scala.concurrent.impl.Promise$DefaultPromise@2df79942
> 12
> 13
> 14
> 15
> 16
>
> scala> 17
> 18
> 19
> 20
> 21
> 22
> 23
> 24
> res10.onComplete(println)
> Success(Vector(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
>
> A similar behavior can be seen for the take function:
>
> scala> Source(1 to 100).map{i ⇒ println(i);
> i}.take(10).runWith(Sink.fold(Vector[Int]())(_ :+ _))
> ...
>
>
> As one can see, the stream processes more elements than it has to. It is
> not a problem for me, but I would like to know if this behavior is spec'ed
> anywhere? Shouldn't the stream be able to say, just by looking at the
> arguments for grouped, take and the sink, how many elements should be
> produced?
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-http] Java server with https

2015-11-12 Thread Ricardo Silva
Hello,

is it possible to have a server handling https connections while using the 
routing api?
Reading the docs, there is support for it in the low-level api but I can't 
find it exposed while using the "routes dsl"...

Thanks in advance!

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka-stream] demand doesn't follow arguments for for grouped or take

2015-11-12 Thread Simon Schäfer



On 11/12/2015 06:57 PM, Endre Varga wrote:

Hi Simon,

This documentation section explains this in more detail: 
http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0-M1/scala/stream-rate.html
Ok, I didn't come to that point of the documentation so far. I'm going 
to have a look, thanks!


Btw, Sink.head actually does not request more than one, however this 
is not transitive and cannot be. Grouped cannot possibly know how many 
elements a downstream ever needs, so it tries to prefetch and exploit 
concurrency.
Why not? Doesn't it have to wait for a demand as all other components 
have to do too? I mean Sink.head has a demand of 1, shouldn't grouped 
get this demand and then forward a demand of 1*N, where N is the 
argument to grouped?


-Endre

On Thu, Nov 12, 2015 at 6:39 PM, Simon Schäfer > wrote:


I just tried the following (on 2.0-M1):

scala> Source(1 to 100).map{i ⇒ println(i);
i}.grouped(10).runWith(Sink.head)
1
2
3
4
5
6
7
8
9
10
11
res10:
scala.concurrent.Future[scala.collection.immutable.Seq[Int]] =
scala.concurrent.impl.Promise$DefaultPromise@2df79942
12
13
14
15
16

scala> 17
18
19
20
21
22
23
24
res10.onComplete(println)
Success(Vector(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))

A similar behavior can be seen for the take function:

scala> Source(1 to 100).map{i ⇒ println(i);
i}.take(10).runWith(Sink.fold(Vector[Int]())(_ :+ _))
...


As one can see, the stream processes more elements than it has to.
It is not a problem for me, but I would like to know if this
behavior is spec'ed anywhere? Shouldn't the stream be able to say,
just by looking at the arguments for grouped, take and the sink,
how many elements should be produced?
-- 
>> Read the docs: http://akka.io/docs/

>> Check the FAQ:
http://doc.akka.io/docs/akka/current/additional/faq.html
>> Search the archives:
https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google
Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it,
send an email to akka-user+unsubscr...@googlegroups.com
.
To post to this group, send email to akka-user@googlegroups.com
.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


--
>> Read the docs: http://akka.io/docs/
>> Check the FAQ: 
http://doc.akka.io/docs/akka/current/additional/faq.html

>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the 
Google Groups "Akka User List" group.
To unsubscribe from this topic, visit 
https://groups.google.com/d/topic/akka-user/m4V-AsScuak/unsubscribe.
To unsubscribe from this group and all its topics, send an email to 
akka-user+unsubscr...@googlegroups.com 
.
To post to this group, send email to akka-user@googlegroups.com 
.

Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


--

 Read the docs: http://akka.io/docs/
 Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
 Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka User List" group.

To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] akka sharding monitoring

2015-11-12 Thread Rodrigo Boavida
Brice,

Thanks for the answer. I will definitely have a look into what the events 
could give me and how the strategy could be tuned to provide such events.

tnks,
Rod

On Thursday, November 12, 2015 at 4:24:27 PM UTC, Brice Figureau wrote:
>
> On Thu, 2015-11-12 at 07:41 -0800, Rodrigo Boavida wrote: 
>
> > 
> > I'm currently starting to implement an Akka sharding based 
> > infrastructure and given the dynamic load balancing nature and 
> > recoverability, part of the monitoring process would be around 
> > monitoring the current shards and entities owned by each shard. 
> > 
> > 
> > Would be nice to know if there is anything similar to the cluster 
> > events that would help me monitor for sharding changing events like 
> > re-balancing of shards, and quantifiers of shards per node and 
> > entities per shard. 
>
> To my knowledge there's nothing as useful as cluster events. Last time I 
> asked a question like this one, someone suggested implementing a custom 
> ShardAllocationStrategy. This is called every time the system needs to 
> know where to allocate a shard, so that would be a good place to send 
> the type of events you're looking for (you also receive the global 
> mapping of shards, which can be useful). Unfortunately I hadn't the time 
> to implement such things, it's still on my TODO list :( 
>
> HTH, 
>
> -- 
> Brice Figureau  
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-stream] demand doesn't follow arguments for for grouped or take

2015-11-12 Thread Simon Schäfer
I just tried the following (on 2.0-M1):

scala> Source(1 to 100).map{i ⇒ println(i); 
i}.grouped(10).runWith(Sink.head)
1
2
3
4
5
6
7
8
9
10
11
res10: scala.concurrent.Future[scala.collection.immutable.Seq[Int]] = 
scala.concurrent.impl.Promise$DefaultPromise@2df79942
12
13
14
15
16

scala> 17
18
19
20
21
22
23
24
res10.onComplete(println)
Success(Vector(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))

A similar behavior can be seen for the take function:

scala> Source(1 to 100).map{i ⇒ println(i); 
i}.take(10).runWith(Sink.fold(Vector[Int]())(_ :+ _))
...


As one can see, the stream processes more elements than it has to. It is 
not a problem for me, but I would like to know if this behavior is spec'ed 
anywhere? Shouldn't the stream be able to say, just by looking at the 
arguments for grouped, take and the sink, how many elements should be 
produced?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] akka sharding monitoring

2015-11-12 Thread Brice Figureau
On Thu, 2015-11-12 at 07:41 -0800, Rodrigo Boavida wrote:

> 
> I'm currently starting to implement an Akka sharding based
> infrastructure and given the dynamic load balancing nature and
> recoverability, part of the monitoring process would be around
> monitoring the current shards and entities owned by each shard. 
> 
> 
> Would be nice to know if there is anything similar to the cluster
> events that would help me monitor for sharding changing events like
> re-balancing of shards, and quantifiers of shards per node and
> entities per shard.

To my knowledge there's nothing as useful as cluster events. Last time I
asked a question like this one, someone suggested implementing a custom
ShardAllocationStrategy. This is called every time the system needs to
know where to allocate a shard, so that would be a good place to send
the type of events you're looking for (you also receive the global
mapping of shards, which can be useful). Unfortunately I hadn't the time
to implement such things, it's still on my TODO list :(

HTH,

-- 
Brice Figureau 

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka-stream] demand doesn't follow arguments for for grouped or take

2015-11-12 Thread Simon Schäfer



On 11/12/2015 07:15 PM, Endre Varga wrote:



On Thu, Nov 12, 2015 at 7:09 PM, Simon Schäfer > wrote:




On 11/12/2015 06:57 PM, Endre Varga wrote:

Hi Simon,

This documentation section explains this in more detail:

http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0-M1/scala/stream-rate.html

Ok, I didn't come to that point of the documentation so far. I'm
going to have a look, thanks!


Btw, Sink.head actually does not request more than one, however
this is not transitive and cannot be. Grouped cannot possibly
know how many elements a downstream ever needs, so it tries to
prefetch and exploit concurrency.

Why not? Doesn't it have to wait for a demand as all other
components have to do too? I mean Sink.head has a demand of 1,
shouldn't grouped get this demand and then forward a demand of
1*N, where N is the argument to grouped?


No, demand is not forwarded, that does not work in general, especially 
in the face of graph enabled stages (how would merge work?). Demand is 
strictly local to a connection. If grouped gets a demand of 1, then it 
knows that it can only emit one, but that does not dictate how much it 
tries to get from upstream. What if the sink is not a Sink.head but a 
network transport that signals demand one-by-one once the bytes are 
written to the network? In this case grouped would better off 
aggregating elements in the background. Just because a stage demanded 
1, it does not mean it will not demand later more.

Ok, I understand now. Thanks for the explanation.


-Endre



-Endre

On Thu, Nov 12, 2015 at 6:39 PM, Simon Schäfer > wrote:

I just tried the following (on 2.0-M1):

scala> Source(1 to 100).map{i ⇒ println(i);
i}.grouped(10).runWith(Sink.head)
1
2
3
4
5
6
7
8
9
10
11
res10:
scala.concurrent.Future[scala.collection.immutable.Seq[Int]]
= scala.concurrent.impl.Promise$DefaultPromise@2df79942
12
13
14
15
16

scala> 17
18
19
20
21
22
23
24
res10.onComplete(println)
Success(Vector(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))

A similar behavior can be seen for the take function:

scala> Source(1 to 100).map{i ⇒ println(i);
i}.take(10).runWith(Sink.fold(Vector[Int]())(_ :+ _))
...


As one can see, the stream processes more elements than it
has to. It is not a problem for me, but I would like to know
if this behavior is spec'ed anywhere? Shouldn't the stream be
able to say, just by looking at the arguments for grouped,
take and the sink, how many elements should be produced?
-- 
>> Read the docs: http://akka.io/docs/

>> Check the FAQ:
http://doc.akka.io/docs/akka/current/additional/faq.html
>> Search the archives:
https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the
Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from
it, send an email to akka-user+unsubscr...@googlegroups.com
.
To post to this group, send email to
akka-user@googlegroups.com .
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


-- 
>> Read the docs: http://akka.io/docs/

>> Check the FAQ:
http://doc.akka.io/docs/akka/current/additional/faq.html
>> Search the archives:
https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic
in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit
https://groups.google.com/d/topic/akka-user/m4V-AsScuak/unsubscribe.
To unsubscribe from this group and all its topics, send an email
to akka-user+unsubscr...@googlegroups.com
.
To post to this group, send email to akka-user@googlegroups.com
.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


-- 
>> Read the docs: http://akka.io/docs/

>> Check the FAQ:
http://doc.akka.io/docs/akka/current/additional/faq.html
>> Search the archives:
https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google
Groups "Akka User List" group.
To