[akka-user] Stream API Fundamentals: Activator Template on Akka Http with Websockets

2016-07-25 Thread Dagny T

Newbie with Akka and Streams here!

I'm set on understanding the fundamentals of the latest Akka Streams API 
through standing up a prototype.  ;0)

I've looked to the Typesafe Activator Template on Akka Http with Websockets 
to do this; and have posted questions on the Disqus comments section; but 
apparently there isn't anyone monitoring that.

SO, I'm just going to re-post those here to see if anyone with relevant 
experience is able and willing to answer!  Would be grateful to be referred 
to a coherent Blog/GitHub repo which would answer most of these questions 
in any case!  

THANKS so much in advance!

1) TweetActor publishes to EventStream; and TweetPublisher receives this 
and forwards to WebSocket. What's the logic for separating the TweetActor 
and TweetPublisher? i.e. TweetActor is essentially acting as a Publisher, 
and Source of 'flow' for Tweets; HOWEVER, I'm confused about the 
distinction between, and when to use:
- context.system.eventStream.publish(tweet) within TweetActor 
VS
- Flow instantiated with TweetPublisher Source


2) What if I wanted to integrate with an (external) REAL Tweets Stream API 
from Twitter;
but I don't know how their API implements the Reactive Streams Protocol?

How would I use AkkaHttp Streams as a Client to GET tweets from that 
external Source; i.e. What are the latest recommended APIs I should be 
using to handle that?
- I'm seeing the Akka Template for AkkaSampleTwitterStream using this:
response.entity.dataBytes.runForeach
VS
- following THIS Akka Template; wouldn't one be constructing a Flow that 
MIRRORs what's in THIS OUTWARD flow; instead with an INWARD flow? i.e. 
Source.ignore; and 
class TweetSubscriber extends ActorSubscriber[Tweet]?


3) What is the default serialization format for AkkaStreams; and is it set 
to a reasonably efficient one? Otherwise, where can I override this; and 
what's the best-practice for setting this to?


4) How do local Stream buffers work roughly? 
- Where and how are the default and custom configuration property files for 
cache size in terms of number of typed Objects setup so as not to blow-up 
local memory? 
- Is each Stream dedicated to only one type of message; albeit permitting 
bi-directional flows?
- Where and how are the cache-eviction policies defaulted/configured in a 
property file? i.e. drop latest; or drop earliest?
- What is the distinction between when you use a configured Flow that's 
Materialized later;
OR, the context.system.eventStream (which is materialized by default with 
import of Akka Streams libraries?)


5) Handling versioning of incoming messages from external sources:

- Is there a good example for error-handling for serialization errors based 
on version
differences between the incoming external message; vs the assumed message 
structure processed internally?


6) Configuration of routes in externalized configuration files:

- Is there a coherent example which shows how to setup configuration 
properties files for mapping URLs for REST and WebSocket endpoints?
i.e. the nested code blocks within main.scala in this example could get 
hard to maintain in a larger application!


-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [Cluster] Association failed even though cluster.leave(cluster.selfAddress) was called?

2016-07-25 Thread kraythe
I get the idea. But does that mean I should create a top level actor like 
the /user guard but under user for all my actors and wait for the member to 
leave ? This is occurring inside of a Play Framework shutdown, so I would 
have to figure out how to orchestrate that. 

On Monday, July 25, 2016 at 2:01:52 PM UTC-5, Konrad Malawski wrote:
>
>
> Well the node leaving does nothing but shut down. The other nodes go on 
> with life of course.
>
> That's exactly the problem though: You exit the JVM before anyone has the 
> chance to even receive the Leave message.
>
> You should await the gossip message, basically:
>
> Cluster.subscribe(self, classOf[MemberRemoved])
>
> ... case MembreRemoved(address) if address == selfAddress => System.exit()
>
>
> Replied in a bit of a rush, hope you get the idea :-)
>
> You can also check out my Zen of Akka talk for a bit more wordy 
> explanation of this.
>
> -- 
> Konrad `ktoso` Malawski
> Akka  @ Lightbend 
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [Cluster] Association failed even though cluster.leave(cluster.selfAddress) was called?

2016-07-25 Thread Konrad Malawski
Well the node leaving does nothing but shut down. The other nodes go on
with life of course.

That's exactly the problem though: You exit the JVM before anyone has the
chance to even receive the Leave message.

You should await the gossip message, basically:

Cluster.subscribe(self, classOf[MemberRemoved])

... case MembreRemoved(address) if address == selfAddress => System.exit()


Replied in a bit of a rush, hope you get the idea :-)

You can also check out my Zen of Akka talk for a bit more wordy explanation
of this.

-- 
Konrad `ktoso` Malawski
Akka  @ Lightbend 

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Question about how to use PeekMailBox

2016-07-25 Thread Yan Pei
Hello All,

  I am trying to use PeekMailBox for retrying after the message processing 
failed.
  In the Actor, I am using Future to delivery the message for processing 
and piping the result back to itself. Do some matching(Failure or something 
else) after that.
  Where am I supposed to put PeekMailboxExtension.lookup().ack(context)? I 
couldn't put inside the future or before after future or matching section.

Thanks for helping!
Yan

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [Cluster] Association failed even though cluster.leave(cluster.selfAddress) was called?

2016-07-25 Thread kraythe
Well the node leaving does nothing but shut down. The other nodes go on 
with life of course. 

On Monday, July 25, 2016 at 1:34:37 PM UTC-5, Konrad Malawski wrote:
>
> What do you do after you call leave?
>
> -- 
> Konrad `ktoso` Malawski
> Akka  @ Lightbend 
>
> On 25 July 2016 at 20:27:29, kraythe (kra...@gmail.com ) 
> wrote:
>
> In our code prior to shutting down a node we issue the following call. 
>
> val cluster: Cluster = Cluster.get(system)
> cluster.leave(cluster.selfAddress)
>
>
> However, despite that the other node still complains about unreachable 
> node. Any way I can stop this? 
>
> 2016-07-25 13:23:33 -0500 - [WARN] - [ReliableDeliverySupervisor] akka.tcp
> ://
> mysystem@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fruckus%40127.0.0.1%3A2552-1
>  
> -  Association with remote system [akka.tcp://ruckus@127.0.0.1:2552] has 
> failed, address is now gated for [5000] ms. Reason: [Association failed 
> with [akka.tcp://mysystem@127.0.0.1:2552]] Caused by: [Connection 
> refused: /127.0.0.1:2552]
>
> Thanks in advance!
>
> -- Robert
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: 
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+...@googlegroups.com .
> To post to this group, send email to akka...@googlegroups.com 
> .
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [Cluster] Association failed even though cluster.leave(cluster.selfAddress) was called?

2016-07-25 Thread Konrad Malawski
What do you do after you call leave?

-- 
Konrad `ktoso` Malawski
Akka  @ Lightbend 

On 25 July 2016 at 20:27:29, kraythe (kray...@gmail.com) wrote:

In our code prior to shutting down a node we issue the following call.

val cluster: Cluster = Cluster.get(system)
cluster.leave(cluster.selfAddress)


However, despite that the other node still complains about unreachable
node. Any way I can stop this?

2016-07-25 13:23:33 -0500 - [WARN] - [ReliableDeliverySupervisor] akka.tcp:
//
mysystem@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fruckus%40127.0.0.1%3A2552-1
-  Association with remote system [akka.tcp://ruckus@127.0.0.1:2552] has
failed, address is now gated for [5000] ms. Reason: [Association failed
with [akka.tcp://mysystem@127.0.0.1:2552]] Caused by: [Connection refused: /
127.0.0.1:2552]

Thanks in advance!

-- Robert
--
>> Read the docs: http://akka.io/docs/
>> Check the FAQ:
http://doc.akka.io/docs/akka/current/additional/faq.html
>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups
"Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Issue with Akka Camel Producer: component is receiving BoxedUnit

2016-07-25 Thread Héctor Veiga
Hello,

I am trying to use a simple Akka Camel Producer that sends some gzip data 
to the Camel File Component, however I am finding an exception because 
internally it is also trying to produce a BoxedUnit to the component and 
the component does not know how to handle an object of type BoxedUnit. I 
have some code to replicate the issue. Maybe the issue is with my code and 
there is a bug. Any help would be appreciated. Here is the code:

import java.io.ByteArrayOutputStream
import java.util.zip.GZIPOutputStream

import akka.actor.{ActorSystem, Props}
import akka.camel.{CamelMessage, Producer}
import org.scalatest.{Matchers, WordSpecLike}

/**
  * Created by hecortiz on 7/25/16.
  */
class ProducerTest extends WordSpecLike with Matchers {

  "Producer" should {
"produce" in {
  val system = ActorSystem("Test")
  val producer = 
system.actorOf(CamelMessageProducer.props("file:./output"), "producer")

  val data = "This is some test data"

  val obj = new ByteArrayOutputStream()
  val gzip = new GZIPOutputStream(obj)
  gzip.write(data.getBytes("UTF-8"))
  gzip.close()

  val gzipBytes = obj.toByteArray
  obj.close()

  val camelMessage = CamelMessage(gzipBytes, Map())

  producer ! camelMessage

  Thread.sleep(1000)
}
  }

}

object CamelMessageProducer {
  def props(uri: String) = Props(new CamelMessageProducer(uri))
}

class CamelMessageProducer(uri: String) extends Producer {
  def endpointUri = uri

  override def transformOutgoingMessage(msg: Any) = {
println(s"${msg.getClass}")
  }
}

And here is the exception I am getting:

Testing started at 1:24 PM ...
class akka.camel.CamelMessage
13:24:34.933 [Test-akka.actor.default-dispatcher-3] ERROR 
akka.actor.OneForOneStrategy - Cannot store file: 
./output/ID-6-1469471074666-0-1
akka.camel.AkkaCamelException: Cannot store file: 
./output/ID-6-1469471074666-0-1
at 
akka.camel.ProducerSupport$$anonfun$produce$1.applyOrElse(Producer.scala:73) 
~[akka-camel_2.11-2.4.7.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala:484) 
~[akka-actor_2.11-2.4.7.jar:?]
at some.package.CamelMessageProducer.aroundReceive(ProducerTest.scala:44) 
~[test-classes/:?]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) 
[akka-actor_2.11-2.4.7.jar:?]
at akka.actor.ActorCell.invoke(ActorCell.scala:495) 
[akka-actor_2.11-2.4.7.jar:?]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) 
[akka-actor_2.11-2.4.7.jar:?]
at akka.dispatch.Mailbox.run(Mailbox.scala:224) 
[akka-actor_2.11-2.4.7.jar:?]
at akka.dispatch.Mailbox.exec(Mailbox.scala:234) 
[akka-actor_2.11-2.4.7.jar:?]
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[scala-library-2.11.7.jar:?]
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 
[scala-library-2.11.7.jar:?]
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[scala-library-2.11.7.jar:?]
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 
[scala-library-2.11.7.jar:?]
Caused by: 
org.apache.camel.component.file.GenericFileOperationFailedException: Cannot 
store file: ./output/ID-6-1469471074666-0-1
at 
org.apache.camel.component.file.FileOperations.storeFile(FileOperations.java:346)
 
~[camel-core-2.17.1.jar:2.17.1]
at 
org.apache.camel.component.file.GenericFileProducer.writeFile(GenericFileProducer.java:277)
 
~[camel-core-2.17.1.jar:2.17.1]
at 
org.apache.camel.component.file.GenericFileProducer.processExchange(GenericFileProducer.java:165)
 
~[camel-core-2.17.1.jar:2.17.1]
at 
org.apache.camel.component.file.GenericFileProducer.process(GenericFileProducer.java:79)
 
~[camel-core-2.17.1.jar:2.17.1]
at 
org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
 
~[camel-core-2.17.1.jar:2.17.1]
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:145) 
~[camel-core-2.17.1.jar:2.17.1]
at akka.camel.ProducerSupport$ProducerChild.produce(Producer.scala:137) 
~[akka-camel_2.11-2.4.7.jar:?]
at 
akka.camel.ProducerSupport$ProducerChild$$anonfun$receive$1.applyOrElse(Producer.scala:111)
 
~[akka-camel_2.11-2.4.7.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala:484) 
~[akka-actor_2.11-2.4.7.jar:?]
at 
akka.camel.ProducerSupport$ProducerChild.aroundReceive(Producer.scala:108) 
~[akka-camel_2.11-2.4.7.jar:?]
... 9 more
Caused by: org.apache.camel.InvalidPayloadException: No body available of 
type: java.io.InputStream but has value: () of type: 
scala.runtime.BoxedUnit on: Message[ID-6-1469471074666-0-1]. Caused by: 
No type converter available to convert from type: scala.runtime.BoxedUnit 
to the required type: java.io.InputStream with value (). Exchange[]. Caused 
by: [org.apache.camel.NoTypeConversionAvailableException - No type 
converter available to convert from type: scala.runtime.BoxedUnit to the 
required type: java.io.InputStream with value ()]
at 

[akka-user] [Cluster] Association failed even though cluster.leave(cluster.selfAddress) was called?

2016-07-25 Thread kraythe
In our code prior to shutting down a node we issue the following call.

val cluster: Cluster = Cluster.get(system)
cluster.leave(cluster.selfAddress)


However, despite that the other node still complains about unreachable 
node. Any way I can stop this? 

2016-07-25 13:23:33 -0500 - [WARN] - [ReliableDeliverySupervisor] 
akka.tcp://mysystem@127.0.0.1:2551/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fruckus%40127.0.0.1%3A2552-1
 
-  Association with remote system [akka.tcp://ruckus@127.0.0.1:2552] has 
failed, address is now gated for [5000] ms. Reason: [Association failed 
with [akka.tcp://mysystem@127.0.0.1:2552]] Caused by: [Connection refused: 
/127.0.0.1:2552]

Thanks in advance!

-- Robert

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: Akka Cluster + Leveldb : Persistence failure when replaying events for ...

2016-07-25 Thread Manoj Santhakumaran
Hi Justin,

Thank you very much for the link to Cassandra Cluster Manager (CCM). I will 
try that out this week.
I also find Redis plugin 
 interesting as it is 
easy to connect to an existing Redis installation and proceed with my task.

Manoj


On Thursday, 21 July 2016 12:03:23 UTC-7, Justin du coeur wrote:
>
> Just an observation: this is a common enough point of confusion that I 
> actually think y'all might want to consider removing the LevelDB default.  
> As I recently discovered, it's almost ridiculously easy to boot up a local 
> Cassandra development "cluster" using ccm . 
>  (With zero Cassandra knowledge, I got it up and running with Akka 
> Persistence in something like 15 minutes.)  You might want to use that as 
> the recommended default to start with instead...
>
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: Akka Cluster + Leveldb : Persistence failure when replaying events for ...

2016-07-25 Thread Manoj Santhakumaran
Hi Patrik,

Thank you very much for your response. I helped me a lot. Much appreciated.

Manoj

On Friday, 22 July 2016 00:19:12 UTC-7, Patrik Nordwall wrote:
>
> That is a setup that is not supported. It's clearly documented that it's 
> not intended for production usage.
>
> /Patrik
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Unit test akka persistence against the inmem journal

2016-07-25 Thread jsoeters
How can I get a handle to the inmem journal from my tests. I'm trying to 
write a simple test wrapper that looks something like this:

given("an order") {
  Seq(OrderCreated(orderId=1234), ItemAdded(productId=123, quantity=5))
}

when("removing an item") {
  RemoveItemCommand(orderId=1234, productId=123, quantity=3)
}

then("the item should be removed") {
  Seq(ItemRemoved(orderId-1234, productId=123, quantity=3))
}

So the given function would have to insert some events into the in-memory 
journal and the then function would have to assert on the in-memory journal 
that the sequence of expected events is added to the journal.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka Streams as an ETL tool?

2016-07-25 Thread James Matlik
Hello Endre,

I would be happy to contribute some documentation when I have some time.
Unfortunately my current code base is not OSS friendly, so it will take
some effort to provide meaningful examples of more complex patterns I've
been using.  Do you have a preference on how cookbook examples are
submitted?
- James

On Jul 22, 2016 5:46 AM, "Akka Team"  wrote:

> Hi James
>
> On Thu, Jul 14, 2016 at 1:22 PM, James Matlik 
> wrote:
>
>> Using Akka streams for ETL is our primary use case. The back pressure
>> support has been extremely useful in helping us maximize throughout while
>> at the same time avoid overwhelming the multiple external rest services we
>> query against. By maintaining dedicated, fixed sized dispatcher pools, we
>> can easily use our legacy blocking client SDKs over a fixed max number of
>> concurrent connections/requests. Then the ETL can process as fast as it
>> possibly can within those constraints.
>>
>> We found the learning curve to be on the steep side, but once it clicks,
>> the power and ease of use Streams provides is... impressive...
>> refreshing... exhilarating... addicting.. take your pick.
>>
> Thank you for the kind words. Are you maybe interested in contributing
> some material around ETL? Maybe a blog post, or even as simple as adding
> some patterns into our cookbook:
> http://doc.akka.io/docs/akka/2.4/scala/stream/stream-cookbook.html ?
>
> -Endre
>
>
>
>>
>> On Jul 14, 2016 12:28 AM, "Beno"  wrote:
>>
>>> Ive been using Akka streaming for a use case which I dont see much about
>>> - that of a small/moderate scale ETL or simple processing pipeline. Im
>>> relatively new to it all, so I just wanted to see if I might be missing
>>> something that would change my opinion, which is that Akka Streams is among
>>> the best tools for data cleaning--  the graph dsl is so easy to code with
>>> and reason about.
>>>
>>> The details: Batch processing to clean and curate data, with external
>>> calls RESTful requests as part of the flow.
>>>
>>> Source[A] (read from file or DB) ~> Flow[A,B](some transformation
>>> function) ~>  Flow[B,C] (by way of a RESTful request/response) ~>
>>> Flow[C,D](graph query) ~> Sink[D](to DB)
>>>
>>> Where Source might be 50,000 lines in a file or rows in a table.
>>>
>>> Thanks for any feedback
>>>
>>> --
>>> >> Read the docs: http://akka.io/docs/
>>> >> Check the FAQ:
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >> Search the archives:
>>> https://groups.google.com/group/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to akka-user+unsubscr...@googlegroups.com.
>>> To post to this group, send email to akka-user@googlegroups.com.
>>> Visit this group at https://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>> --
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ:
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> --
> Akka Team
> Lightbend  - Reactive apps on the JVM
> Twitter: @akkateam
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to 

Re: [akka-user] Can't get backpressure to work...

2016-07-25 Thread Roy Russo
Thanks Endre,

I have indeed found it failing in interesting ways, and impossible to 
debug. 

I'll look at the unfold 
APIs... 
http://doc.akka.io/docs/akka/snapshot/scala/stream/stages-overview.html#unfold


On Monday, July 25, 2016 at 4:20:25 AM UTC-4, drewhk wrote:
>
> Hi,
>
> Your actor code is faulty, you are closing over the actor's state in 
> Future callbacks, namely future.foreach and future.onFailure. This is not 
> thread safe and will fail in various interesting ways. 
>
> There is not much reason to use an ActorPublisher for this to be honest. 
> There are built-in combinators to achieve similar things with less chance 
> for mistakes:
>
>
> http://doc.akka.io/api/akka/2.4/index.html#akka.stream.scaladsl.Source$@unfoldResource[T,S](create:()=
> >S,read:S=>Option[T],close:S=>Unit):akka.stream.scaladsl.Source[T,akka.NotUsed]
>
>
> http://doc.akka.io/api/akka/2.4/index.html#akka.stream.scaladsl.Source$@unfoldResourceAsync[T,S](create:()=
> >scala.concurrent.Future[S],read:S=>scala.concurrent.Future[Option[T]],close:S=>scala.concurrent.Future[akka.Done]):akka.stream.scaladsl.Source[T,akka.NotUsed]
>
> -Endre
>
> On Mon, Jul 25, 2016 at 1:11 AM, Roy Russo  > wrote:
>
>> Scenario: Read from one database using an ActorPublisher, write to 
>> another database using a subscriber.
>>
>> I expect the reads to be much faster than the writes, so we need to slow 
>> down the reads at some threshold. Growing an unbounded queue of data, will 
>> simply OOM. The below works for small datasets. With large datasets, the 
>> gap between read-write becomes enormous and so OOM. 
>>
>> My ActorPublisher:
>>
>> class ScrollPublisher(clientFrom: ElasticClient, config: Config) extends 
>> ActorPublisher[SearchHits] {
>>
>>   val logger = Logger(LoggerFactory.getLogger(this.getClass))
>>   var readCount = 0
>>   var processing = false
>>
>>   import akka.stream.actor.ActorPublisherMessage._
>>
>>   @volatile var executeQuery = () => clientFrom.execute {
>> search in config.indexFrom / config.mapping scroll "30m" limit 
>> config.scrollSize
>>   }
>>
>>   def nextHits(): Unit = {
>> if (!processing) {
>>   processing = true
>>   val future = executeQuery()
>>   future.foreach {
>> response =>
>>   processing = false
>>   if (response.getHits.hits.nonEmpty) {
>> logger.info("Fetched: \t" + response.getHits.getHits.length + " 
>> documents in\t" + response.getTookInMillis + "ms.")
>> readCount += response.getHits.getHits.length
>> logger.info("Total Fetched:\t" + readCount)
>> if (isActive && totalDemand > 0) {
>>   executeQuery = () => clientFrom.execute {
>> searchScroll(response.getScrollId).keepAlive("30m")
>>   }
>>   nextHits()
>>   onNext(response.getHits) // sends elements to the stream
>> }
>>   } else {
>> onComplete()
>>   }
>>   }
>>   future.onFailure {
>> case t =>
>>   processing = false
>>   throw t
>>   }
>> }
>>   }
>>
>>   def receive = {
>> case Request(cnt) =>
>>   logger.info("ActorPublisher Received: \t" + cnt)
>>   if (isActive && totalDemand > 0) {
>> nextHits()
>>   }
>> case Cancel =>
>>   context.stop(self)
>> case _ =>
>>   }
>> }
>>
>> Enter code here...
>>
>>
>> Source declaration:
>>
>> // SearchHits Akka Stream Source
>> val documentSource = Source.actorPublisher[SearchHits](Props(new 
>> ScrollPublisher(clientFrom, config))).map {
>>   case searchHits =>
>> searchHits.getHits
>> }
>>
>>
>> My Sink, which performs an asynch write to the new database: 
>>
>> documentSource.buffer(16, 
>> OverflowStrategy.backpressure).runWith(Sink.foreach {
>>   searchHits =>
>> Thread.sleep(1000)
>> totalRec += searchHits.size
>> logger.info("\t\t\tRECEIVED: " + searchHits.size + " \t\t\t TOTAL 
>> RECEIVED: "+ totalRec)
>> val bulkIndexes = searchHits.map(hit => (hit.`type`, hit.id, 
>> hit.sourceAsString())).collect {
>>   case (typ, _id, source) =>
>> index into config.indexTo / config.mapping id _id -> typ doc 
>> JsonDocumentSource(source)
>> }
>> val future = clientTo.execute {
>>   bulk(
>> bulkIndexes
>>   )
>> }
>>
>>
>>
>> The sleep is put in there to simulate lag for local development. I've 
>> tried changing values for the buffer, and the max/initial values for the 
>> materializer, and still it seems to ignore back pressure. 
>>
>> Is there a logic flaw in this code?
>>
>> -- 
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop 

Re: [akka-user] Question about fork-join with actors

2016-07-25 Thread Nader Aeinehchi
Thank you very much for the response.  I am re-organizing my code based on
the per-request-model now.

On Thu, Jul 21, 2016 at 10:08 PM, Justin du coeur 
wrote:

> There are a lot of ways you *could* slice this, but the most
> straightforward is the simple Actor-per-request model.  Actors are fairly
> lightweight, so it's sometimes worthwhile to just stand one up for a
> complex request like this.  So you instantiate an Actor in charge of
> managing the authorization process; that sends out the requests to the
> various SecurityProvider workers, and collates the requests.  If any come
> back negative, it immediately responds negative; if they all return
> positive, it responds positive.
>
> In practice, I would typically expect to have a Session Actor that is
> managing the interactions for a particular User; that could often handle
> this responsibility.
>
> On Thu, Jul 21, 2016 at 3:33 AM, Nader Aeinehchi 
> wrote:
>
>> I have a classic question: how to fork some work and then join the
>> results.
>>
>> Let me explain the problem:
>>
>> There is a security manager that asks several security providers (e.g.
>> Active Directory, Tivoli, Mainframe) if a given call is authorized.  If all
>> the security providers say "yes", the call is authorized. But if at least
>> one of the security providers says "no", the called is denied.
>>
>> In the context of parallelism, a call to each of security providers is
>> delegated to a worker (securityProvider).  Workers need to work in parallel.
>>
>> If one of the workers says "no", there is no point waiting for the answer
>> from other workers. Then security mangager should immediately answer "no".
>>
>> Note that the application is multi-user.  Therefore, each fork-join
>> should be related to some correlation id, e.g. a requestId.
>>
>> I would like to know how the above problem can be solved using actors.
>> Let me assert that my motivations with actors is the fault-tolerance
>> ability that actor model offers.
>>
>> In advance thank you very much.
>>
>> Nader
>>
>> 
>>
>> type AccessRequest = Tuple2[RequestId, Payload]
>>
>>   def isAuthorized(accessRequest: AccessRequest): AccessDecision = {
>>
>> var decision: AccessDecision = No
>>
>> val loop = new Breaks
>> loop.breakable {
>>   for (securityProvider <- securityProviders.values) {
>>
>> val newDecision = securityProvider.check(accessRequest)
>>
>> if (newDecision == No)) {
>>   loop.break
>> }
>>
>> decision = newDecision
>>
>>   }
>> }
>> decision
>>   }
>> 
>>
>>
>> --
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ:
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to a topic in the
> Google Groups "Akka User List" group.
> To unsubscribe from this topic, visit
> https://groups.google.com/d/topic/akka-user/ufaOlJ-9dVQ/unsubscribe.
> To unsubscribe from this group and all its topics, send an email to
> akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Can't get backpressure to work...

2016-07-25 Thread Endre Varga
Hi,

Your actor code is faulty, you are closing over the actor's state in Future
callbacks, namely future.foreach and future.onFailure. This is not thread
safe and will fail in various interesting ways.

There is not much reason to use an ActorPublisher for this to be honest.
There are built-in combinators to achieve similar things with less chance
for mistakes:

http://doc.akka.io/api/akka/2.4/index.html#akka.stream.scaladsl.Source$@unfoldResource[T,S](create:()=
>S,read:S=>Option[T],close:S=>Unit):akka.stream.scaladsl.Source[T,akka.NotUsed]

http://doc.akka.io/api/akka/2.4/index.html#akka.stream.scaladsl.Source$@unfoldResourceAsync[T,S](create:()=
>scala.concurrent.Future[S],read:S=>scala.concurrent.Future[Option[T]],close:S=>scala.concurrent.Future[akka.Done]):akka.stream.scaladsl.Source[T,akka.NotUsed]

-Endre

On Mon, Jul 25, 2016 at 1:11 AM, Roy Russo  wrote:

> Scenario: Read from one database using an ActorPublisher, write to another
> database using a subscriber.
>
> I expect the reads to be much faster than the writes, so we need to slow
> down the reads at some threshold. Growing an unbounded queue of data, will
> simply OOM. The below works for small datasets. With large datasets, the
> gap between read-write becomes enormous and so OOM.
>
> My ActorPublisher:
>
> class ScrollPublisher(clientFrom: ElasticClient, config: Config) extends 
> ActorPublisher[SearchHits] {
>
>   val logger = Logger(LoggerFactory.getLogger(this.getClass))
>   var readCount = 0
>   var processing = false
>
>   import akka.stream.actor.ActorPublisherMessage._
>
>   @volatile var executeQuery = () => clientFrom.execute {
> search in config.indexFrom / config.mapping scroll "30m" limit 
> config.scrollSize
>   }
>
>   def nextHits(): Unit = {
> if (!processing) {
>   processing = true
>   val future = executeQuery()
>   future.foreach {
> response =>
>   processing = false
>   if (response.getHits.hits.nonEmpty) {
> logger.info("Fetched: \t" + response.getHits.getHits.length + " 
> documents in\t" + response.getTookInMillis + "ms.")
> readCount += response.getHits.getHits.length
> logger.info("Total Fetched:\t" + readCount)
> if (isActive && totalDemand > 0) {
>   executeQuery = () => clientFrom.execute {
> searchScroll(response.getScrollId).keepAlive("30m")
>   }
>   nextHits()
>   onNext(response.getHits) // sends elements to the stream
> }
>   } else {
> onComplete()
>   }
>   }
>   future.onFailure {
> case t =>
>   processing = false
>   throw t
>   }
> }
>   }
>
>   def receive = {
> case Request(cnt) =>
>   logger.info("ActorPublisher Received: \t" + cnt)
>   if (isActive && totalDemand > 0) {
> nextHits()
>   }
> case Cancel =>
>   context.stop(self)
> case _ =>
>   }
> }
>
> Enter code here...
>
>
> Source declaration:
>
> // SearchHits Akka Stream Source
> val documentSource = Source.actorPublisher[SearchHits](Props(new 
> ScrollPublisher(clientFrom, config))).map {
>   case searchHits =>
> searchHits.getHits
> }
>
>
> My Sink, which performs an asynch write to the new database:
>
> documentSource.buffer(16, OverflowStrategy.backpressure).runWith(Sink.foreach 
> {
>   searchHits =>
> Thread.sleep(1000)
> totalRec += searchHits.size
> logger.info("\t\t\tRECEIVED: " + searchHits.size + " \t\t\t TOTAL 
> RECEIVED: "+ totalRec)
> val bulkIndexes = searchHits.map(hit => (hit.`type`, hit.id, 
> hit.sourceAsString())).collect {
>   case (typ, _id, source) =>
> index into config.indexTo / config.mapping id _id -> typ doc 
> JsonDocumentSource(source)
> }
> val future = clientTo.execute {
>   bulk(
> bulkIndexes
>   )
> }
>
>
>
> The sleep is put in there to simulate lag for local development. I've
> tried changing values for the buffer, and the max/initial values for the
> materializer, and still it seems to ignore back pressure.
>
> Is there a logic flaw in this code?
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>

[akka-user] Can't get backpressure to work...

2016-07-25 Thread Roy Russo
Scenario: Read from one database using an ActorPublisher, write to another 
database using a subscriber.

I expect the reads to be much faster than the writes, so we need to slow 
down the reads at some threshold. Growing an unbounded queue of data, will 
simply OOM. The below works for small datasets. With large datasets, the 
gap between read-write becomes enormous and so OOM. 

My ActorPublisher:

class ScrollPublisher(clientFrom: ElasticClient, config: Config) extends 
ActorPublisher[SearchHits] {

  val logger = Logger(LoggerFactory.getLogger(this.getClass))
  var readCount = 0
  var processing = false

  import akka.stream.actor.ActorPublisherMessage._

  @volatile var executeQuery = () => clientFrom.execute {
search in config.indexFrom / config.mapping scroll "30m" limit 
config.scrollSize
  }

  def nextHits(): Unit = {
if (!processing) {
  processing = true
  val future = executeQuery()
  future.foreach {
response =>
  processing = false
  if (response.getHits.hits.nonEmpty) {
logger.info("Fetched: \t" + response.getHits.getHits.length + " 
documents in\t" + response.getTookInMillis + "ms.")
readCount += response.getHits.getHits.length
logger.info("Total Fetched:\t" + readCount)
if (isActive && totalDemand > 0) {
  executeQuery = () => clientFrom.execute {
searchScroll(response.getScrollId).keepAlive("30m")
  }
  nextHits()
  onNext(response.getHits) // sends elements to the stream
}
  } else {
onComplete()
  }
  }
  future.onFailure {
case t =>
  processing = false
  throw t
  }
}
  }

  def receive = {
case Request(cnt) =>
  logger.info("ActorPublisher Received: \t" + cnt)
  if (isActive && totalDemand > 0) {
nextHits()
  }
case Cancel =>
  context.stop(self)
case _ =>
  }
}

Enter code here...


Source declaration:

// SearchHits Akka Stream Source
val documentSource = Source.actorPublisher[SearchHits](Props(new 
ScrollPublisher(clientFrom, config))).map {
  case searchHits =>
searchHits.getHits
}


My Sink, which performs an asynch write to the new database: 

documentSource.buffer(16, OverflowStrategy.backpressure).runWith(Sink.foreach {
  searchHits =>
Thread.sleep(1000)
totalRec += searchHits.size
logger.info("\t\t\tRECEIVED: " + searchHits.size + " \t\t\t TOTAL RECEIVED: 
"+ totalRec)
val bulkIndexes = searchHits.map(hit => (hit.`type`, hit.id, 
hit.sourceAsString())).collect {
  case (typ, _id, source) =>
index into config.indexTo / config.mapping id _id -> typ doc 
JsonDocumentSource(source)
}
val future = clientTo.execute {
  bulk(
bulkIndexes
  )
}



The sleep is put in there to simulate lag for local development. I've tried 
changing values for the buffer, and the max/initial values for the 
materializer, and still it seems to ignore back pressure. 

Is there a logic flaw in this code?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.