Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value

2015-07-26 Thread David Pinn
The JavaDoc for the Source.from(...) method says this: Elements are 
emitted periodically with the specified interval. The tick element will be 
delivered to downstream consumers that has requested any elements. If a 
consumer has not requested any elements at the point in time when the tick 
element is produced it will not receive that tick element later. It will 
receive new tick elements as soon as it has requested more elements.

There's nothing there about stream completion. I suppose it should complete 
normally. The name Cancellable threw me a bit. Most of the JavaDoc for the 
Graph variants associate cancellation with failure; for example: 'Cancels 
when' downstream cancels.

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value

2015-07-26 Thread Viktor Klang
There is no stream cancellation, there is only erronous or normal
completion.
What does the java/scaladoc say?

-- 
Cheers,
√
On 26 Jul 2015 09:17, David Pinn dp...@byandlarge.net wrote:

 No, but I might try to put one together. When the tick source is
 cancelled, should that cause cancellation of the stream, or completion of
 the stream?


-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value

2015-07-26 Thread David Pinn
Yes, that's what I'm doing. More precisely, I'm defining a composite Source 
like so:

Initial ~ Merge ~ Akka HTTP ~ Broadcast ~ Consumer
   +~ ~+ 
   | |
   +~~~ Throttler ~~+

The Throttler flow contains a tick source, the Cancellable materialized 
value of which is externalized as the materialized value of the composite 
Source. The Akka HTTP stage sends a GET request to a remote system.

I have a small 'Runner' program that exercises the composite flow. It calls 
cancel after 40 seconds of execution. As you can see from the logging 
output, the loop closes in a less-than-happy fashion.

[info] 07/26 21:35:28 INFO  appspokes.onedrive.experimental.Runner - 
Cancelling stream

[info] akka.stream.StreamTcpException: The connection closed with error 
Connection reset by peer

[info] 07/26 21:35:58 ERROR experimental.Runner - FAILURE!

[info] akka.stream.StreamTcpException: The connection closed with error 
Connection reset by peer

[info] 07/26 21:35:58 DEBUG akka.stream.impl.fusing.ActorInterpreter - fail 
due to: The connection closed with error Connection reset by peer

[info] 07/26 21:35:58 INFO  experimental.Runner - Shutting down

[info] 07/26 21:35:58 DEBUG akka.stream.impl.fusing.ActorInterpreter - fail 
due to: The connection closed with error Connection reset by peer

[info] 07/26 21:35:58 DEBUG akka.stream.impl.Broadcast - fail due to: The 
connection closed with error Connection reset by peer

[info] 07/26 21:35:58 INFO  experimental.Runner - Actor system has been 
shut down

[info] 07/26 21:35:58 ERROR akka.stream.Materializer - [Sync Response] 
Upstream failed.

[info] akka.stream.StreamTcpException: The connection closed with error 
Connection reset by peer

[success] Total time: 74 s, completed 26/07/2015 9:35:59 PM


Now, I will confess that, much earlier in the program's execution, I'm 
seeing this:

[info] 07/26 21:35:03 DEBUG akka.stream.impl.PrefixAndTailImpl - Cancelling 
akka.stream.impl.MultiStreamOutputProcessor$SubstreamOutput@3216206b 
(after: 5000 ms)

...which I'm assuming is unrelated because it happens much earlier, and 
none of the error output occurs until the cancellation call is made.

I'd be happy to post the code here, but it's rather verbose. The essential 
part is probably this (yeah, Java. sorry):

builder.from(url).via(merge).via(polling(accessToken)).to(bcast.in());
builder.from(bcast.out(1)).via(throttler).via(nextLinks).to(merge); // loop-back
return bcast.out(0); // outlet to the world


I'll work on a minimized failing test case that does not involve HTTP 
requests, to see if the stream still completes with failure.  
 

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value

2015-07-26 Thread Rafał Krzewski
David, does your flow contain an actual internal feedback loop? What I 
understand from your description is that you have something like this:

{ tick source ~ mapAsync calling external service ~ } ~ consumer

And your return the Cancellable provided by the tick source as materialized 
value as your flow's materialized value. I think in this case, calling 
cancel() on that returned value should complete the whole flow without 
problems. If this is not the case, it would be best if you showed some code 
:)

By the way, is there an issue for producing custom materialized values from 
user-defined graphs? I browsed through the 'streams-*' milestones and I was 
unable to find it. Should I file one?

cheers,
Rafał

 
W dniu niedziela, 26 lipca 2015 10:24:06 UTC+2 użytkownik David Pinn 
napisał:

 The JavaDoc for the Source.from(...) method says this: Elements are 
 emitted periodically with the specified interval. The tick element will be 
 delivered to downstream consumers that has requested any elements. If a 
 consumer has not requested any elements at the point in time when the tick 
 element is produced it will not receive that tick element later. It will 
 receive new tick elements as soon as it has requested more elements.

 There's nothing there about stream completion. I suppose it should 
 complete normally. The name Cancellable threw me a bit. Most of the JavaDoc 
 for the Graph variants associate cancellation with failure; for example: 
 'Cancels when' downstream cancels.



-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value

2015-07-26 Thread David Pinn
No, but I might try to put one together. When the tick source is cancelled, 
should that cause cancellation of the stream, or completion of the stream?

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value

2015-07-26 Thread David Pinn
I've created a test that exercises a composite source that has an internal 
cycle. It processes integers, starting at 1 and doubling it until it is 
cancelled.
The code can be viewed as a gist 
https://gist.github.com/dpinn/cdac6709e00b7de64163.

Two things to note:
a) the stream completes successfully. To that extent, I've failed to 
reproduce the strangeness that I mentioned earlier; and
b) the stream processes one more element than I expect it to.

Doubling 1 Cancelling tick source

= 2

Doubling 2 = 4

[error] Test cancellation.CancellationTest.testCancellation failed: 
assertion failed: expected OnComplete, found OnNext(4), took 4.11 sec

[error] Failed: Total 1, Failed 1, Errors 0, Passed 0

[error] Failed tests:

[error] cancellation.CancellationTest

[error] (onedrive/test:test) sbt.TestsFailedException: Tests unsuccessful

[error] Total time: 6 s, completed 26/07/2015 11:30:15 PM

The source under test spits out the integer 4, even though the ticks were 
cancelled while the integer 1 was being doubled.

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value

2015-07-26 Thread Rafał Krzewski
Since we have √'s attention, allow me to repeat my original question:
 

 The question now remains, how do I fabricate a Graph[FlowShape[T, T], 
 Cancellable] that will generate an instance of Cancellable on each 
 materialization connected to a PushPullStage, in such way that cancel() 
 would blow the fuse and terminate the stream?


 This is currently not possible, as we had not yet exposed the necessary 
 API to users yet -- but the funcionality is there internally, we just want 
 to gather a bit of experience before opening it up. 

 -Endre
  


What about this use case? Is there a ticket tracking this already?

Cheers,
Rafał
 

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Best practice for nested flows and graphs

2015-07-26 Thread Raam Rosh-Hai
I would really appreciate any tips for the correct direction 

On Thursday, July 23, 2015 at 3:02:54 PM UTC+3, ra...@qmerce.com wrote:

 I am implementing a simple http server which listens to requests and 
 persists them to Cassandra.

  I have to split the stream in two using Broadcast so I am using Graphs. 

 I have something like this:

 val bindingFuture: Future[Http.ServerBinding] =
   source.serverSource.to(Sink.foreach { connection = // foreach 
 materializes the source
 system.log.debug(Accepted new connection from  + 
 connection.remoteAddress)
 // ... and then actually handle the connection
 connection.handleWith(source.handleRequests)(server_materializer)
   }).run()(server_materializer)



 And also 

 val handleRequests = 
 Flow[HttpRequest].mapAsyncUnordered(core.num_of_cores)(v = 
 Future(requestHandler(v)))

  

 val requestHandler: HttpRequest = HttpResponse = {
 case HttpRequest(POST, Uri.Path(/stats), headers, entity, protocol) =


 val s = 
 entity.withContentType(ContentTypes.`application/json`).getDataBytes()
 val g = FlowGraph.closed() { implicit builder =

 import FlowGraph.Implicits._
 //Bla bla bla...


   s ~ gflow  ~ bcast ~ breakEvent ~ persistToC ~ out
   bcast ~ out

 }
 g.run()

 HttpResponse(200,headers = corsHeaders, entity = Thanks!)

   case _: HttpRequest =
 HttpResponse(404, entity = Unknown resource!)
 }


 I am calling `.run` twice and from what I understand a new stream will be 
 initiated for each request. 

 My plan is to setup an actor that will publish the entities downstream, 
 However I am not sure this is the correct approach.

 Thanks in advance, 

 Raam 
  


-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] How do I mix typed and untyped actors?

2015-07-26 Thread Jack Daniels


How do I mix typed and untyped actors ? As I understood I have to specify 
main actor when I create instance of ActorSystem like this


val system: akka.typed.ActorSystem[Start] = akka.typed.ActorSystem(main, 
Props(mainBehaviour))


On the other hand I use akka-http which is initialized like this


implicit val system = ActorSystem()implicit val executor = 
system.dispatcherimplicit val materializer = ActorMaterializer()// etc...


I see that I can create typed system from untyped system by calling


object ActorSystem {
  def apply(untyped: akka.actor.ActorSystem): ActorSystem[Nothing] = new 
Wrapper(untyped.asInstanceOf[ExtendedActorSystem])}


So assuming I did


val typeSystem = akka.typed.ActorSystem(untypedSystem)


how do I create my first typed actor from typeSystem ? There is no typed 
ActorContext whose actorOf I can call.


What general recommendations would you give for using Akka Typed with 
Akk-Http ?


Thanks!


http://stackoverflow.com/questions/31621607/how-do-i-mix-typed-and-untyped-actors

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-streams] Consuming on TCP server side - is this really the best way?

2015-07-26 Thread Derek Wyatt
Hi,

I'm still trying to figure out the best way to work with TCP flows and, 
while I've got something working, this seems really quite wrong, so there's 
gotta be a better way.

What I want to do is send an Iterable[Int] from the client to the server 
and have the server materialize that resulting flow in a 
Future[Iterable[Int]]. 


val bytesStage = // elided... BidiFlow of serialization and framing

val serverValuePromise = Promise[Seq[AnyRef]]()

// Technically, the materialized value isn't important, since it's actually 
going to be pulled out
// via the Promise
val serverConsumerFlow: Flow[AnyRef, AnyRef, Future[Seq[AnyRef]]] = Flow.
wrap(
  // Consume the client's stream and complete the serverValuePromise with 
its folded result
  Sink.fold(Vector.empty[AnyRef])((acc, v: AnyRef) = acc :+ v).
mapMaterializedValue(v = { serverValuePromise.completeWith(v); v }),
  // We're not sending anything from this side
  Source.empty)(Keep.left)

// The server
val serverSide: Future[ServerBinding] = StreamTcp().bindAndHandle(
serverConsumerFlow.join(bytesStage), 0.0.0.0, 0, halfClose = true)

// We really want to stop listening once the client has successfully 
connected, but this is good
// enough
serverValuePromise.future.onComplete {
  case _ =
serverSide.onSuccess {
  case binding = binding.unbind()
}
}

// I need the endpoint where the client needs to connect
val destination = Await.result(serverSide, 1.second).localAddress

// Get the source running
Source((1 to 10).map(new Integer(_))).via(bytesStage.joinMat(StreamTcp().
outgoingConnection(destination))(Keep.right)).to(Sink.ignore).run()

// Print out what the client has sent to the server
Await.result(serverValuePromise.future, 1.second).foreach(t = println(stt: 
$t))

I tried doing this the other way around - where the server side supplies 
source - but this caused me issues with actually shutting down the socket. 
Having the client do it seems to make shutting down the socket on 
completion of the source, just naturally occur.  The problem with the 
server side providing the source was that the client source needed to 
finish properly.  If I created it as `empty` then it would kill things 
too quickly.  If I then created it as a n Actor source that just didn't do 
anything, I couldn't find a decent way to close it.

There's gotta be a better way to do this, but I'm too much of a noob to see 
it.  Can anyone improve this code for me?

Thanks,
Derek



-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [Remoting] Akka Actor Refs Can be Called from another Node?

2015-07-26 Thread kraythe
Greetings, 

I am working on some tech to integrate hazelcast and Akka. The thing is 
with Hazelcast, a particular object in the cache may be on any one of the 
nodes in the cluster and serializing the object from its host node to the 
node in the cluster where it is being processed can be expensive. So I was 
debating an alternative paradigm. Basically I was considering storing the 
actor refs to the Actor in the hazelcast map keyed by the member id. The 
idea is I ask the map Give me the actor ref for node 3 and it returns 
back an ActorRef object and then the router uses that to forward the 
message to the correct router on the correct node via a normal call. For 
example: 

def forward(msg: Object, key: Object) = {
  val uuid = 
hazelcast.getPartitionService().getPartition(key).getMember().getUuid()
  hazelcast.getMap(RoutingActors).get(uuid) ! msg
}

Can anyone see any issue with such a paradigm? Do I have to tell hazelcast 
that the actor ref is on another node? Do I need to open any ports or do 
any other configuration to get such a thing to work? 

Thanks.

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Play + Akka Cluster Aware Routing?

2015-07-26 Thread kraythe
So is the strategy I laid out flawed? I don't want to be accessing objects on 
the wrong node, that's a critical element of the dynamic. getting Akka and play 
to cooperate in cluster is not that hard. I want to make intelligent routing 
decisions.

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value

2015-07-26 Thread David Pinn
I think this could happen if a stage immediately requests the next element 
once it starts working on a received element. Is that the case?

On Sunday, 26 July 2015 23:39:00 UTC+10, David Pinn wrote:

 b) the stream processes one more element than I expect it to.

 The source under test spits out the integer 4, even though the ticks were 
 cancelled while the integer 1 was being doubled.


-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value

2015-07-26 Thread Viktor Klang
Requesting one element at a time would lead to low-to-no concurrency and
high overhead per element.
Check input buffer size in Attributes.

-- 
Cheers,
√
On 26 Jul 2015 17:50, David Pinn dp...@byandlarge.net wrote:

 I think this could happen if a stage immediately requests the next element
 once it starts working on a received element. Is that the case?

 On Sunday, 26 July 2015 23:39:00 UTC+10, David Pinn wrote:

 b) the stream processes one more element than I expect it to.

 The source under test spits out the integer 4, even though the ticks were
 cancelled while the integer 1 was being doubled.



-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.