[akka-user] Reactive-Kafka : How to retry on Demand after consumer exception

2017-02-01 Thread Yadukrishnan K
Hello,
We are using Reactive-Kafka for our application. We have a scenario as 
below, where we want to stop sending the messages to the consumer if any 
exception occurred while processing the message. The message should be 
retried after stipulated time or on explicit request from the consumer 
side. With our current approach, lets say, if the database of the consumer 
is down for sometime, it will still try to read from kafka and process the 
messages, but processing fails due to DB issues. This will keep the 
application busy unnecessarily. Instead of this, we want to pause the 
consumer to receive the messages for a stipulated time (say, wait for 30min 
to retry). 
We are not sure how to handle that case. 

Is it possible to do the same ? Am I missing something?

Here is the sample code taken from the reactive kafka :


Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
  .mapAsync(1) { msg =>
Future {
  /**
* Unreliable consumer, for e.g. saving to DB might not be 
successful due to DB is down
*/
}.map(_ => msg.committableOffset).recover {
  case ex => {
/**
  * HOW TO DO 
  * On exception, I would like to tell stream to stop sending 
messages and pause the consumer and try again within stipulated time
  * or on demand from the last committed offset
  */
throw ex
  }
}
  }
  .batch(max = 20, first => CommittableOffsetBatch.empty.updated(first)) 
{ (batch, elem) =>
batch.updated(elem)
  }
  .mapAsync(3)(_.commitScaladsl())
  .runWith(Sink.ignore)



-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Database query does not cancel when streaming rows using a DatabasePublisher

2017-02-01 Thread Mark Goldenstein
Hi Johan,

Thank you for the quick response! Indeed, in my application I do create the 
DatabasePublisher anew for each request. This is a mistake I introduced 
when I stripped down the code for this forum post. Sorry about this.

The problem I have is that even when the http connection is closed, the 
database query still continues streaming. I would assume it should stop 
since there is no downstream demand anymore when the connection closes... I 
am not sure whether this is a problem related to slick or if my streaming 
setup is flawed.

Mark


On Thursday, February 2, 2017 at 12:18:38 AM UTC+1, Akka Team wrote:
>
> From the slick docs:
>
> Execution of the DBIOAction does not start until a Subscriber is attached 
> to the stream. Only a singleSubscriber is supported, and any further 
> attempts to subscribe again will fail ...
>
>
> I'm guessing what you actually want is not to create your publisher once 
> upon application start and subscribe to that multiple time like you do now 
> but rather once for every request that comes in to webservice/csv (so 
> inside the block of the get directive.
>
> -- 
> Johan
> Akka Team
>
> On Wed, Feb 1, 2017 at 3:27 PM, Mark Goldenstein  > wrote:
>
>> Hi guys!
>>
>> (Cross-posting this since I did not get a response in the Slick group.)
>>
>> I use akka-http together with Slick to stream rows from a mysql database 
>> as csv via http.
>>
>> Streaming works as expected unless I disconnect the http connection 
>> prematurely. In this case I would expect that the sql query should stop 
>> executing since there is no downstream demand anymore. However, I can see 
>> in the mysql processes that the query is still running and it only stops 
>> when I shut down the webserver (or when the query finishes fetching all the 
>> rows).
>>
>> If I turn on debug logs I see repeatedly, while streaming:
>>
>> 22:05:17.738 [db-8] DEBUG slick.basic.BasicBackend.stream - Suspending 
>> streaming action with continuation (more data available)
>> 22:05:17.743 [default-akka.actor.default-dispatcher-8] DEBUG 
>> slick.basic.BasicBackend.stream - Scheduling stream continuation after 
>> transition from demand = 0
>> 22:05:17.743 [db-9] DEBUG slick.basic.BasicBackend.stream - Restarting 
>>  streaming action, realDemand = 8
>>
>> Then, once I disconnect the connection, and there is no downstream demand 
>> anymore, I see:
>>
>> 22:05:17.744 [db-9] DEBUG slick.basic.BasicBackend.stream - Suspending 
>> streaming action with continuation (more data available)
>> 22:05:17.749 [default-akka.actor.default-dispatcher-7] DEBUG 
>> akka.io.TcpIncomingConnection - Closing connection due to IO error 
>> java.io.IOException: Broken pipe
>> 22:05:17.774 [default-akka.actor.default-dispatcher-16] DEBUG 
>> slick.basic.BasicBackend.stream - Scheduling stream continuation after 
>> transition from demand = 0
>> 22:05:17.774 [db-10] DEBUG slick.basic.BasicBackend.stream - Restarting 
>>  streaming action, realDemand = oo
>>
>> What is going on here? Why is realDemand oo?
>>
>> My code looks something like this:
>>
>> object Main extends App { 
>>   implicit val system = ActorSystem()
>>   implicit val executor = system.dispatcher
>>   implicit val materializer = ActorMaterializer()
>>
>>   implicit val csvMarshaller =
>> Marshaller.withFixedContentType[CSVLine, 
>> ByteString](ContentTypes.`text/csv(UTF-8)`) {
>>   case CSVLine(line) => ByteString(line)
>> }
>>
>>   implicit val csvStreamingSupport = EntityStreamingSupport.csv()
>> .withParallelMarshalling(parallelism = 8, unordered = false)
>>
>>   val query = ??? // a Slick query
>>
>>   val publisher: DatabasePublisher[Tick] = DB.get.stream(
>> query.result.withStatementParameters(statementInit = DB.enableStream))
>>
>>   val routes = {
>> logRequestResult("webservice") {
>>   encodeResponse {
>> pathPrefix("csv") {
>>   pathSingleSlash {
>> get {
>>   complete {
>> Source.fromPublisher(publisher).map(t => 
>> CSVLine(t.toCSV()))
>>   }
>> }
>>   }
>> }
>>   }
>> }
>>   }
>>
>>   Http().bindAndHandle(routes, "127.0.0.1", 9000)
>> }
>>
>> object DB {
>>   private val db = Database.forConfig("db")
>>
>>   def get = db
>>
>>   def enableStream(statement: java.sql.Statement): Unit = {
>> statement match {
>>   case s if s.isWrapperFor(classOf[com.mysql.jdbc.StatementImpl]) =>
>> 
>> s.unwrap(classOf[com.mysql.jdbc.StatementImpl]).enableStreamingResults()
>>   case _ => // do nothing
>> }
>>   }
>> }
>>
>> Any thoughts on this?
>>
>> Mark
>>
>> -- 
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To 

Re: [akka-user] Balancing TCP requests

2017-02-01 Thread Akka Team
It looks correct to me, what is it that makes you think it is not the
server closing the connection as the exception says?


-- 
Johan
Akka Team

On Thu, Jan 26, 2017 at 3:20 PM, Maksym Besida 
wrote:

> Based on this paragraph
> 
>  I've
> implemented a balancer for tcp outgoing requests.
>
> def outGoing(host: String, port: Int) = 
> Tcp().outgoingConnection(InetSocketAddress.createUnresolved(host, port), 
> halfClose = true)
>
>
> def balancer(endpoints: Seq[(String, Int)]): Flow[ByteString, ByteString, 
> NotUsed] = {
>   Flow.fromGraph(GraphDSL.create(){ implicit b =>
> import GraphDSL.Implicits._
> val balancer = b.add(Balance[ByteString](endpoints.length, 
> waitForAllDownstreams = true))
> val merge = b.add(Merge[ByteString](endpoints.length))
>
> endpoints.foreach { case(host, port) =>
>   balancer ~> outGoing(host, port).async ~> merge
> }
>
> FlowShape(balancer.in, merge.out)
>   })
> }
>
>
> I use it as follows
>
> source.via(balancer(Seq(("localhost", 22002), ("localhost", 
> 22002.runWith(Sink.ignore)
>
>
> I'm not interseted in result so I run it with Sink.ignore. I open 2 tcp
> connections to the same destination host for the test purpose(in reality it
> will go to different hosts)
>
> When I produce a lot of elments in my source I eventually(not after first
> message) get next error and stream fails
> akka.stream.StreamTcpException: The connection closed with error: An
> existing connection was forcibly closed by the remote host
>
> Can anybody suggest what is wrong with the example?
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Database query does not cancel when streaming rows using a DatabasePublisher

2017-02-01 Thread Akka Team
>From the slick docs:

Execution of the DBIOAction does not start until a Subscriber is attached
to the stream. Only a singleSubscriber is supported, and any further
attempts to subscribe again will fail ...


I'm guessing what you actually want is not to create your publisher once
upon application start and subscribe to that multiple time like you do now
but rather once for every request that comes in to webservice/csv (so
inside the block of the get directive.

-- 
Johan
Akka Team

On Wed, Feb 1, 2017 at 3:27 PM, Mark Goldenstein 
wrote:

> Hi guys!
>
> (Cross-posting this since I did not get a response in the Slick group.)
>
> I use akka-http together with Slick to stream rows from a mysql database
> as csv via http.
>
> Streaming works as expected unless I disconnect the http connection
> prematurely. In this case I would expect that the sql query should stop
> executing since there is no downstream demand anymore. However, I can see
> in the mysql processes that the query is still running and it only stops
> when I shut down the webserver (or when the query finishes fetching all the
> rows).
>
> If I turn on debug logs I see repeatedly, while streaming:
>
> 22:05:17.738 [db-8] DEBUG slick.basic.BasicBackend.stream - Suspending
> streaming action with continuation (more data available)
> 22:05:17.743 [default-akka.actor.default-dispatcher-8] DEBUG
> slick.basic.BasicBackend.stream - Scheduling stream continuation after
> transition from demand = 0
> 22:05:17.743 [db-9] DEBUG slick.basic.BasicBackend.stream - Restarting
>  streaming action, realDemand = 8
>
> Then, once I disconnect the connection, and there is no downstream demand
> anymore, I see:
>
> 22:05:17.744 [db-9] DEBUG slick.basic.BasicBackend.stream - Suspending
> streaming action with continuation (more data available)
> 22:05:17.749 [default-akka.actor.default-dispatcher-7] DEBUG
> akka.io.TcpIncomingConnection - Closing connection due to IO error
> java.io.IOException: Broken pipe
> 22:05:17.774 [default-akka.actor.default-dispatcher-16] DEBUG
> slick.basic.BasicBackend.stream - Scheduling stream continuation after
> transition from demand = 0
> 22:05:17.774 [db-10] DEBUG slick.basic.BasicBackend.stream - Restarting
>  streaming action, realDemand = oo
>
> What is going on here? Why is realDemand oo?
>
> My code looks something like this:
>
> object Main extends App {
>   implicit val system = ActorSystem()
>   implicit val executor = system.dispatcher
>   implicit val materializer = ActorMaterializer()
>
>   implicit val csvMarshaller =
> Marshaller.withFixedContentType[CSVLine,
> ByteString](ContentTypes.`text/csv(UTF-8)`) {
>   case CSVLine(line) => ByteString(line)
> }
>
>   implicit val csvStreamingSupport = EntityStreamingSupport.csv()
> .withParallelMarshalling(parallelism = 8, unordered = false)
>
>   val query = ??? // a Slick query
>
>   val publisher: DatabasePublisher[Tick] = DB.get.stream(
> query.result.withStatementParameters(statementInit = DB.enableStream))
>
>   val routes = {
> logRequestResult("webservice") {
>   encodeResponse {
> pathPrefix("csv") {
>   pathSingleSlash {
> get {
>   complete {
> Source.fromPublisher(publisher).map(t =>
> CSVLine(t.toCSV()))
>   }
> }
>   }
> }
>   }
> }
>   }
>
>   Http().bindAndHandle(routes, "127.0.0.1", 9000)
> }
>
> object DB {
>   private val db = Database.forConfig("db")
>
>   def get = db
>
>   def enableStream(statement: java.sql.Statement): Unit = {
> statement match {
>   case s if s.isWrapperFor(classOf[com.mysql.jdbc.StatementImpl]) =>
> s.unwrap(classOf[com.mysql.jdbc.StatementImpl]).enableStream
> ingResults()
>   case _ => // do nothing
> }
>   }
> }
>
> Any thoughts on this?
>
> Mark
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this 

[akka-user] Re: Akka persistence: can you use it as the main storage system for you application's data?

2017-02-01 Thread lutzh


On Tuesday, 31 January 2017 16:17:53 UTC+1, José González Gómez wrote:
>
> Hi!
>
> I've been reading about Akka persistence, and it seems the way to go to 
> persist data in a reactive application, using event sourcing and immutable 
> data models. I have no experience doing this, so I'd love to hear about 
> your experience. Any way, after reading the docs, I have the following 
> doubts (please, correct any false assumption I may have made):
>
> First of all, Akka persistence stores data using a journal. Data in that 
> journal (both events and snapshots) are stored after being serialized. This 
> seems to pose several problems:
>
>- You can't have access to data as you may have in a SQL or NoSQL 
>database, so it seems to be hard to diagnose corrupt data or relationships 
>among that data. Am I missing anything here?
>
> No, I don't think you are. 

>
>- Storing data in a journal seems great for storing immutable data and 
>recovering events for event sourced data, but... how do you manage to do 
>queries on that data? I mean, if you store orders using Akka persistence, 
>how do you get orders for example between two dates? The only thing I can 
>think of here is to use PersistenceQueries in order to keep a traditional 
>database in sync with the journal, but that kind of defeats the purpose of 
>Akka persistence, doesn't it?
>
>
No, it don't think it defeats the purpose, I think the way you describe is 
how it's supposed to be done. Have you had a look at the CQRS pattern 
(Command Query Responsibility Segregation, 
https://martinfowler.com/bliki/CQRS.html)? The basic idea is that you write 
to the event log for durable storage, to be able to recreate entities, but 
for querying you have a whole other data store, the read side. You can use 
the same technology as for the write side (e.g. you could use a NoSQL DB 
for both), but you could also have, as you say, a traditional database. If 
you google some combinations of Akka, CQRS, event sourcing, you should find 
a couple of presentations about it. You could also check the "Lagom 
Persistence" documentation - 
http://www.lagomframework.com/documentation/1.2.x/java/PersistentEntity.html 
-, Lagom Persistence is an Event Sourcing/CQRS layer/DSL on top of Akka 
Persistence. Even if you end up not using it, it should give you some 
inspiration.

Regarding how to keep actors in memory, you may have several requests that 
> access the same entity (persistent actor) so I guess they have a different 
> life cycle than request specific actors, that can be destroyed after 
> serving the request. The point is that you seem to end up with the whole 
> database in memory if you don't provide a mechanism to shut down actors 
> that hasn't been used recently. But then you may be shutting down actors 
> that may be used in a short amount of time and have a negative impact on 
> performance due to the actor reloading its state. I guess this is somehow 
> alleviated with the use of snapshots. Again, am I missing anything here?
>

I've only seen Akka Persistence in combination with Akka Cluster Sharding, 
and then the cluster sharding takes care of the lifecycle for you. It will 
also passivate actors that haven't been used for a while, and activate them 
if you send a message again. Activating a persistent actor indeed means 
replaying the events, but possibly only since the last snapshot. So yes, 
fundamentally, the architectural idea is to have the whole database in 
memory (https://www.martinfowler.com/bliki/MemoryImage.html) and to 
recreate entities by replaying all events. But thanks to some optimizations 
(actication/passivation, snapshots) you only have a "working set" in 
memory, and only replay events since the last snapshot.

 

>
> Thanks!!
> José
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Database query does not cancel when streaming rows using a DatabasePublisher

2017-02-01 Thread Mark Goldenstein
Hi guys!

(Cross-posting this since I did not get a response in the Slick group.)

I use akka-http together with Slick to stream rows from a mysql database as 
csv via http.

Streaming works as expected unless I disconnect the http connection 
prematurely. In this case I would expect that the sql query should stop 
executing since there is no downstream demand anymore. However, I can see 
in the mysql processes that the query is still running and it only stops 
when I shut down the webserver (or when the query finishes fetching all the 
rows).

If I turn on debug logs I see repeatedly, while streaming:

22:05:17.738 [db-8] DEBUG slick.basic.BasicBackend.stream - Suspending 
streaming action with continuation (more data available)
22:05:17.743 [default-akka.actor.default-dispatcher-8] DEBUG 
slick.basic.BasicBackend.stream - Scheduling stream continuation after 
transition from demand = 0
22:05:17.743 [db-9] DEBUG slick.basic.BasicBackend.stream - Restarting 
 streaming action, realDemand = 8

Then, once I disconnect the connection, and there is no downstream demand 
anymore, I see:

22:05:17.744 [db-9] DEBUG slick.basic.BasicBackend.stream - Suspending 
streaming action with continuation (more data available)
22:05:17.749 [default-akka.actor.default-dispatcher-7] DEBUG 
akka.io.TcpIncomingConnection - Closing connection due to IO error 
java.io.IOException: Broken pipe
22:05:17.774 [default-akka.actor.default-dispatcher-16] DEBUG 
slick.basic.BasicBackend.stream - Scheduling stream continuation after 
transition from demand = 0
22:05:17.774 [db-10] DEBUG slick.basic.BasicBackend.stream - Restarting 
 streaming action, realDemand = oo

What is going on here? Why is realDemand oo?

My code looks something like this:

object Main extends App { 
  implicit val system = ActorSystem()
  implicit val executor = system.dispatcher
  implicit val materializer = ActorMaterializer()

  implicit val csvMarshaller =
Marshaller.withFixedContentType[CSVLine, 
ByteString](ContentTypes.`text/csv(UTF-8)`) {
  case CSVLine(line) => ByteString(line)
}

  implicit val csvStreamingSupport = EntityStreamingSupport.csv()
.withParallelMarshalling(parallelism = 8, unordered = false)

  val query = ??? // a Slick query

  val publisher: DatabasePublisher[Tick] = DB.get.stream(
query.result.withStatementParameters(statementInit = DB.enableStream))

  val routes = {
logRequestResult("webservice") {
  encodeResponse {
pathPrefix("csv") {
  pathSingleSlash {
get {
  complete {
Source.fromPublisher(publisher).map(t => CSVLine(t.toCSV()))
  }
}
  }
}
  }
}
  }

  Http().bindAndHandle(routes, "127.0.0.1", 9000)
}

object DB {
  private val db = Database.forConfig("db")

  def get = db

  def enableStream(statement: java.sql.Statement): Unit = {
statement match {
  case s if s.isWrapperFor(classOf[com.mysql.jdbc.StatementImpl]) =>

s.unwrap(classOf[com.mysql.jdbc.StatementImpl]).enableStreamingResults()
  case _ => // do nothing
}
  }
}

Any thoughts on this?

Mark

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Chunked Response handling with Akka Http?

2017-02-01 Thread Akka Team
This is definitely possible, just use Chunk for each element instead, that
way you can create them from a stream of strings using .map()

For the "an async request and it responds one or more times" there are a
few possible tools in Akka streams that may solve this. Take a look at
Source.unfoldAsync, Source.unfoldResourceAsync and Source.queue and see if
either of those fit your use case.

-- 
Johan
Akka Team

On Mon, Jan 16, 2017 at 8:03 AM,  wrote:

> I'd like to be able to use akka-http with chunked responses. I'd like to know 
> if I can do the same thing in akka-http as with the playframework. The 
> following is from playframework documentation.
>
> def index = Action {
>   val source = Source.apply(List("kiki", "foo", "bar"))
>   Ok.chunked(source)
> }
>
> We can inspect the HTTP response sent by the server:
>
> HTTP/1.1 200 OK
> Content-Type: text/plain; charset=utf-8
> Transfer-Encoding: chunked
>
> 4
> kiki
> 3
> foo
> 3
> bar
> 0
>
> With akka-http I was able to get close with
>
>  val hi = Chunk("Hi" + "\n")
>
>  val there = Chunk("there" + "\n")
>
>  val last = LastChunk("boom" + "\n")
>
>  val source: Source[ChunkStreamPart, NotUsed] = 
> Source(scala.collection.immutable.Iterable(hi, there, last))
>
>  val rootPath: Route =
>
>path("") {
>
>  get {
>
>for (i <- 1 to 100) {
>
>  ref ! hi
>
>}
>
>complete(Chunked(ContentTypes.`text/plain(UTF-8)`, source))
>
>  }
>
>}
>
>  def routes: Route = rootPath
>
> }
>
> curl returns
>
> < HTTP/1.1 200 OK
>
> < Server: akka
>
> < Date: Mon, 16 Jan 2017 14:03:10 GMT
>
> < Transfer-Encoding: chunked
>
> < Content-Type: text/plain; charset=UTF-8
>
> <
>
> Hi
>
> there
>
>
>
> Two perhaps minor issues: it does not report the last element, and it doesn't 
> include the chunk size as the playframework example did.
>
> What I’d like to know is how can I do this in a dynamic way? I'm interacting 
> with a embedded device (via serial) where I make an async request and it 
> responds one or more times up to about 20 seconds. I'd like each response to 
> be emitted as a chunk. Is this possible?
>
> Thanks,
> Andrew
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: akka-http web-socket Frame support question...

2017-02-01 Thread Jan Heise
I have to add that I just found support for automatic pong messages in 

https://github.com/akka/akka-http/blob/master/akka-http-core/src/main/scala/akka/http/impl/engine/ws/FrameHandler.scala

Maybe somebody can give me a hint/help me with injecting control frames?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: akka-http web-socket Frame support question...

2017-02-01 Thread Jan Heise
Hi,

I'd like to add, that I'd also be interested in solving this. I'm currently 
developing a server-backend that should support ping/pong frames to conform 
to a specification that specifically references those control frames. I am 
not controlling the development of the clients so having an upper level 
protocol for keep-alives instead is not something that I can do. Ignoring 
the whole thing might be fine in the beginning but eventually, I'll have to 
implement support for ping/pong control frames.

I'd like to reference the following ticket:
https://github.com/akka/akka-http/issues/102

Also, in the source, the necessary opcodes are already mentioned/exist. See:
https://github.com/akka/akka-http/blob/master/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Protocol.scala

Is there any more support regarding this already in the source that I have 
missed?
Any pointers on where I might start adding things myself? Where is the 
place that handles control frames? 
I guess sending a pong message automatically when a ping arrives could be 
handled easily if somebody points me in the right direction.
Injecting ping messages and handling the eventually handling missing pong 
messages might need a bit more thought on behalf of someone better than me. 
But I would like to give it a shot, too.

Best regards,

Jan


Am Donnerstag, 22. Dezember 2016 19:15:03 UTC+1 schrieb Muthu Jayakumar:
>
> Hello there,
>
> I am new to akk-http on writing a web-socket server side code (have 
> written one using Spray-IO though).  My question is on how I would be able 
> to use the server-side apis to send other kinds of control-frames? By the 
> spec @ https://tools.ietf.org/html/rfc6455#page-36
> Close 
> Ping 
> Pong
> Data -- Text -- Binary
> The apis have the TextFrame and BinaryFrame part of the Frame covered. I 
> guess, the PingFrame and PongFrame would be useful to have the server-side 
> api to help in the "keep-alive" attribute to the connection(as quoted from 
> the spec: "NOTE: A Ping frame may serve either as a keepalive or as a 
> means to verify that the remote endpoint is still responsive."). The same 
> question on CloseFrame may be useful I guess?
>
> Please advice,
> Muthu
>
> On a side note...
> On top of the akka documentation @ 
> http://doc.akka.io/docs/akka-http/current/scala/http/websocket-support.html, 
> I found the following article useful in getting Web-Socket to hook-up thru 
> actor(s) - 
> https://markatta.com/codemonkey/blog/2016/04/18/chat-with-akka-http-websockets/
> . 
> To me the gap is on understanding akka-streams better. Not sure if may be 
> useful for the documentation to give some hints on how 'server-push' can be 
> achieved using some code snippet similar to this blog?
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: SMACK Stack - Benefit of Akka Cluster?

2017-02-01 Thread Rafał Krzewski
Hi Chris,

Akka Cluster is a means to an end. It enables interesting capabilities like 
cluster singletons, entity sharding, distributed data (CRDTs). They come at 
a cost of added operational complexity. Especially split-brain situations 
need to be managed carefully because of risk of data corruption. In many 
situations running a (dynamically scaled) "swarm" of standalone Akka 
instances that communicate with other parts of the system through queues 
and HTTP interfaces is perfectly fine.

Cheers,
Rafał

W dniu środa, 1 lutego 2017 15:35:19 UTC+1 użytkownik Chris Ridmann napisał:
>
> Hey everyone,
>
> I'm evaluating whether to use the SMACK stack as part of our data 
> pipeline, as demonstrated in this reference app: 
> https://github.com/killrweather/killrweather
>
> I don't want to introduce unnecessary complexity in our stack unless it 
> gives enough bang for the buck.  From that reference app, I'm trying to 
> convince myself of the value that an Akka Cluster brings to that design.  I 
> can see Akka-Streams being valuable as an ingestion layer to Kafka. 
>  However, what is the benefit of wrapping the Spark Streaming Kafka within 
> an actor?  There doesn't really seem to be much internal state within those 
> actors (except I guess a wrapper around the spark streaming context), and 
> thus couldn't Akka just be removed in favor of a simpler non-clustered 
> design?  Or is the streaming context not thread safe and thus an actor 
> provides safeguards around that?
>
> Some of our current use cases are streaming analytics, as well as batch 
> analytics on months of historical data (similar to what the aggregation 
> code in that reference app is doing).
>
> Any insight is appreciated!  
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: About logging in Akka application

2017-02-01 Thread Rafał Krzewski
Hi Yinhua,

First of all, you could model all elements of your system as Actors and the 
Main class would just start an ActorSystem, instantiate your top level 
Actor and then wait for it's termination.
If this is not feasible, non-Actor parts of the application do have a 
reference to the ActorSystem you use to run your Actors, because your 
create it / terminate it, right? Thus, you can create a LoggingAdapter as 
shown above.
If you are using akka-slf4j module, your can also just use SLF4J API 
directly, and all messages should go to the same backend.

Cheers,
Rafał

W dniu środa, 1 lutego 2017 15:55:21 UTC+1 użytkownik Dai Yinhua napisał:
>
> Hi Team,
>
> I am a little confuse about using the SLF4J for logging in akka 
> application.
> I understand AKKA will use event stream so the logging is asynchronise, 
> but this seems the logging is only available inside an actor class by using 
> transient LoggingAdapter logger = Logging.getLogger(getContext().system(), 
> this);
>
> But how can I do the similar logging outside actor class? Is there a way 
> that all parts of the application can use a unified akka logging system?
> Thank you.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] SMACK Stack - Benefit of Akka Cluster?

2017-02-01 Thread Chris Ridmann
Hey everyone,

I'm evaluating whether to use the SMACK stack as part of our data pipeline, 
as demonstrated in this reference 
app: https://github.com/killrweather/killrweather

I don't want to introduce unnecessary complexity in our stack unless it 
gives enough bang for the buck.  From that reference app, I'm trying to 
convince myself of the value that an Akka Cluster brings to that design.  I 
can see Akka-Streams being valuable as an ingestion layer to Kafka. 
 However, what is the benefit of wrapping the Spark Streaming Kafka within 
an actor?  There doesn't really seem to be much internal state within those 
actors (except I guess a wrapper around the spark streaming context), and 
thus couldn't Akka just be removed in favor of a simpler non-clustered 
design?  Or is the streaming context not thread safe and thus an actor 
provides safeguards around that?

Some of our current use cases are streaming analytics, as well as batch 
analytics on months of historical data (similar to what the aggregation 
code in that reference app is doing).

Any insight is appreciated!  

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Delaying akka.remote.netty.tcp.{hostname, port} resolution?

2017-02-01 Thread Rafał Krzewski
Hi Dragos,

I'd probably go with #2, because I think it's more cohesive: only one HTTP 
server implementation and health check supporting code is written only once.

Cheers,
Rafał

W dniu wtorek, 31 stycznia 2017 16:17:56 UTC+1 użytkownik Dragos Manolescu 
napisał:
>
> Greetings --
>
> I'm moving here a thread started on twitter. Here's my scenario: I am 
> spinning up a cluster on a compute fabric that's managed by an external 
> component. One consequence of this configuration is that the nodes' IP 
> addresses and their ports aren't known a priori: they are dynamically 
> allocated by the external component. I have code that performs the 
> resolution and thus could configure the cluster nodes appropriately. 
> Therein lies the rub:
>
> A. The compute fabric performs periodic health checks via HTTP (which I 
> implemented on top of akka-http). The external component will resolve the 
> allocated IP and port only *after* the first health check succeeds.
> B. The resolution code queries the external component to extract the IP 
> and port, and then uses those values to inject 
> the akka.remote.netty.tcp.{hostname, port} into the actor system's config.
>
> Both steps require an actor system, and because the values of 
> akka.remote.netty.tcp.{hostname, port} are not available prior to the http 
> listener being up and ready I can't think of a clean way to accomplish this 
> workflow. I see a few of workarounds; for example:
>
> 1. Factor the health check out of Akka into a different framework that 
> doesn't need an actor system (
> https://bitbucket.org/vetler/minihttpserver/overview ???) and spin up the 
> actor system after the first health check succeeds
> 2. Use 2 actor systems: start one w/o remoting configured and answer the 
> health checks until the IP and port are allocated, then shut it down and 
> start a 2nd one configured for remoting with the acquired IP + port info as 
> well as ready to continue answering the health checks.
>
> I don't know how common this scenario is. I'd be interested in your 
> thoughts about tackling it.
>
> Thanks,
>
> -Dragos
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Class cast exceptions in actors that return Future[Unit]

2017-02-01 Thread Alan Burlison

On 01/02/2017 10:19, Viktor Klang wrote:


Try(block).fold(Status.Success(_), Status.Failure(_))


Neat, I wasn't aware of that - it appeared in 2.12:

https://issues.scala-lang.org/browse/SI-8336

Ta :-)

--
Alan Burlison
--

--

 Read the docs: http://akka.io/docs/
 Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
 Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka User List" group.

To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Class cast exceptions in actors that return Future[Unit]

2017-02-01 Thread Viktor Klang
On Wed, Feb 1, 2017 at 10:51 AM, Alan Burlison 
wrote:

> On 01/02/17 09:06, Viktor Klang wrote:
>
> You're welcome. A reminder that the actual error messages are very
>> important to include. :)
>>
>
> Yes, I consider my wrist to be slapped ;-) Thanks again!


lol, no wrist-slapping intended—but the more info is available, the more
people has a chance to understad.


>
>
> If there is no method in the java/scaladoc then you'll need to convert
>> manually.
>>
>
> If there is I haven't been able to spot it.


Try(block).fold(Status.Success(_), Status.Failure(_)) ?


>
>
> And if you need a shorthand, it should be a one-liner def. ;)
>>
>
> Well, nearly ;-) I just copied the "Try {???}" implementation but made it
> return Akka Status values rather those used by Try - I don't need any
> operations other than to return the Akka success types.
>
>   def actorTry[T](f: => T): akka.actor.Status.Status = {
> try {
>   akka.actor.Status.Success(f)
> } catch {
>   case NonFatal(e) => akka.actor.Status.Failure(e)
> }
>   }
>
> then in the receive method:
>
> case Read(from, to, reader) => sender ! actorTry { read(from, to, reader) }
>
> IIRC akka.actor.Status.Success was created aeons ago,
>> scala.util.Success was added with scala.util.Try, so they have
>> separate origins. (just like java.util.List and
>> scala.collection.immutable.List)
>>
>
> Ah, the curse of history :-) Sounds completely plausible - thanks.


:)


>
>
> --
> Alan Burlison
> --
>
> --
>
>>  Read the docs: http://akka.io/docs/
>>>  Check the FAQ: http://doc.akka.io/docs/akka/c
>>> urrent/additional/faq.html
>>>  Search the archives: https://groups.google.com/grou
>>> p/akka-user
>>>
>> --- You received this message because you are subscribed to the
> Google Groups "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Cheers,
√

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Class cast exceptions in actors that return Future[Unit]

2017-02-01 Thread Alan Burlison

On 01/02/17 09:06, Viktor Klang wrote:


You're welcome. A reminder that the actual error messages are very
important to include. :)


Yes, I consider my wrist to be slapped ;-) Thanks again!


If there is no method in the java/scaladoc then you'll need to convert
manually.


If there is I haven't been able to spot it.


And if you need a shorthand, it should be a one-liner def. ;)


Well, nearly ;-) I just copied the "Try {???}" implementation but made 
it return Akka Status values rather those used by Try - I don't need any 
operations other than to return the Akka success types.


  def actorTry[T](f: => T): akka.actor.Status.Status = {
try {
  akka.actor.Status.Success(f)
} catch {
  case NonFatal(e) => akka.actor.Status.Failure(e)
}
  }

then in the receive method:

case Read(from, to, reader) => sender ! actorTry { read(from, to, reader) }


IIRC akka.actor.Status.Success was created aeons ago,
scala.util.Success was added with scala.util.Try, so they have
separate origins. (just like java.util.List and
scala.collection.immutable.List)


Ah, the curse of history :-) Sounds completely plausible - thanks.

--
Alan Burlison
--

--

 Read the docs: http://akka.io/docs/
 Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
 Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka User List" group.

To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Is there a standard way of referencing external objects inside of custom serializers?

2017-02-01 Thread Alan Burlison
There's another wrinkle with the current Serialization API. It assumes 
that serialization is always from/to binary. If you are using pure-JSON 
and storing it in (say) a flat file you most likely want to serialize 
the payload, then "wrap" that inside an outer JSON object containing the 
serialization metadata needed by the persistence framework. e.g.:


{
  "payload": {
"field1": "My serialized"
"field2": "object"
"field3": "goes here"
  },
  "serializerId": 100,
  "tags": [],
  "writerUuid": "215d6565-77b6-4355-ab84-9a89bebc1b52",
  "sequenceNr": 148,
  "serializerManifest": "MyPersistentObject",
  "eventManifest": "",
  "persistenceId": "MyPersistentActor-one"
}

What you get back from an Akka SerializerWithStringManifest is a Byte[], 
which you need somehow to embed within the wrapper JSON. In the case of 
Spray JSON, as far as I know the only way to do this would be to parse 
the Byte[] back into a JSON parse tree again (!) and then wrap that.


I've worked around this by stubbing out the toBinary/fromBinary 
functions in the SerializerWithStringManifest implementation and adding 
toJSON/fromJSON methods instead. Then in the journal implementation, 
when I do the serializer lookup I check it's of the expected serializer 
type and call the toJSON/fromJSON methods.


That means the JSON serializer and journal implementation must be a 
matching pair, which kind of sucks but at least it looks fairly "normal" 
as far as the application.conf contents go. The other alternative would 
be to just do away with a separate serializer class altogether and do 
the JSON serialization directly inside the journal.


That might be one way of doing what you want - don't use the Akka 
mechanism for configuring & looking up the serializer, just do it 
explicitly inside the journal implementation?


--
Alan Burlison
--

--

 Read the docs: http://akka.io/docs/
 Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
 Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka User List" group.

To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Class cast exceptions in actors that return Future[Unit]

2017-02-01 Thread Viktor Klang
On Wed, Feb 1, 2017 at 2:55 AM, Alan Burlison 
wrote:

> Ahah! I think you may of nailed it... Thanks!
>>
>
> Indeed you did nail it - "thanks" doesn't come close as I'd looked at this
> for so long that I'd gone completely snow-blind ;-)
>

You're welcome. A reminder that the actual error messages are very
important to include. :)


>
> Is there a convenient shorthand way of mapping between a
> scala.util.Success and an akka.actor.Status.Success, or should I just do it
> with a match expression? What I'm doing is wrapping the IO functions that
> could fail in a "Try { ??? }" and returning the result to the calling actor.
>

If there is no method in the java/scaladoc then you'll need to convert
manually. And if you need a shorthand, it should be a one-liner def. ;)


>
> Is there a reason why there's both scala.util.Success and an
> akka.actor.Status.Success?


IIRC akka.actor.Status.Success was created aeons ago, scala.util.Success
was added with scala.util.Try,
so they have separate origins. (just like java.util.List and
scala.collection.immutable.List)


>
>
> --
> Alan Burlison
> --
>
> --
>
>>  Read the docs: http://akka.io/docs/
>>>  Check the FAQ: http://doc.akka.io/docs/akka/c
>>> urrent/additional/faq.html
>>>  Search the archives: https://groups.google.com/grou
>>> p/akka-user
>>>
>> --- You received this message because you are subscribed to the
> Google Groups "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Cheers,
√

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.