Re: [akka-user] [akka-streams] Consuming on TCP server side - is this really the best way?

2015-08-01 Thread Viktor Klang
this is excellent feedback,
I think there's room for improvement for grokkability of Akka Streams,
thanks Derek and Endre.

-- 
Cheers,
√
On 1 Aug 2015 13:12, Derek Wyatt de...@derekwyatt.org wrote:


 Endre Varga endre.va...@typesafe.com
 July 31, 2015 at 4:47 AM


 On Thu, Jul 30, 2015 at 9:07 PM, Derek Wyatt de...@derekwyatt.org wrote:



 Endre Varga endre.va...@typesafe.com
 July 30, 2015 at 2:23 PM
 Hi Derek,

 On Thu, Jul 30, 2015 at 6:48 PM, Derek Wyatt de...@derekwyatt.org
 wrote:

 These examples, and the new documentation page, are extremely helpful.
 Thank you *very* much. I’m continuing to read / digest / modify them to
 get a better understanding, but they’ve already cleared up a lot for me.

 One thing that popped out was the use of Flow.transform(). The
 difference between map and transform is now clear to me, but I then
 started to question the difference between a Stage and a Flow, and they
 now confuse me a bit… (if this is covered somewhere, please forgive me).
 Why is a Stage not a derived type of Flow? Or, perhaps another way to
 say that is, Why do Stages exist at all? It feels like there are Flows,
 BidiFlows and Graphs.

 If you have read the documentation page that I linked before (
 http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-composition.html),
 then you already know that a Flow is nothing else then a box with exactly
 one input and output port, but it might contain a whole network inside.
 Source is similar but with exactly one output port, while Sink has exactly
 one input port. The modules can be composite, i.e. internally contain
 arbitrarily complex graphs. There are modules though that are not
 composite, but atomic, as they directly represent some logic not built from
 other modules.

 I did read it :)  And all that makes perfect sense (now).

 A Stage is just an atomic Flow, if you prefer to look it that way
 (strictly speaking it is the transform() op which is the atomic Flow and
 it is a factory for a Stage). Whenever you are calling .filter(), .map(),
 etc. on a Source or Sink, you are attaching a Stage to the output port at
 the end. See
 https://github.com/akka/akka/blob/release-2.3-dev/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala
 for most of the implementation of the built-in operators.

 OK.  That's good to know.  Digging into the code certainly makes it
 clearer, and it's nice getting a direct pointer of something specific to
 look at.

 Stage feels like it’s just another manifestation of a Flow, perhaps an
 earlier part of the design that may not need to be there?

 No! Hell, no! It is actually one of the most solid parts of the
 internals. You can build a Flow by using the

 So, you're saying, no, then? :)  Fair enough... It just looks like a
 Stage has one input port and one output port, so I started to wonder why
 there was a distinction.  Perhaps it's because it's a solid part of the
 internals, and as an internal component it makes perfect sense. When it
 comes out of the API, maybe it's a bit confusing... but... I'm still
 learning.


 Well, somehow you need to build new stream processing logic when the
 built-in ones are not sufficient. Hence the Stage API.


 built-in operators, but how do you build the built-in operators
 themselves? Well, they are Stages. What should you do when there is an
 operator missing? Build your Stage. You should familiarize yourself with
 stages and look at the doc page for them:
 http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-customize.html

 I've certainly read that too, and in-and-of-itself it makes perfect
 sense.  Really what made me question it was the existence of
 `Flow.transform()`.  Then from there it was a question as to why you would
 need to transform a Stage with one in and one out, into a Flow of one in
 and one out.


 A Stage is an atomic processing entity, a Flow is a chain (network) of
 such entities. Btw, the name transform means transform this stream of
 elements into a different stream of elements by using this logic it is not
 transform this Stage into a Flow.

 Yup, that reads better.


 When you call myFlow.map(f) what is actually called in the background is
 myFlow.transform(() = Map(f)), i.e. map is just a sugar over the most
 generic transform API. Stages are a way to express backpressured stream
 processing logic in an efficient manner.


   And then when I find myself binding Stages together in the same-ish
 manner as what I would bind Flows together, it just seemed redundant.


 You don't bind Stages together, where does that come from? There must be
 some source of confusion here,

 You're right, you don't.  I apologize... the code that I'm commenting on,
 I had to delete, so I'm working from memory.  But in the end, it looked
 like what's on the stream-customization page. I believe I had Flows being
 transformed using Stages.

 the only way to bind Stages together is via the Flow API. I.e. there is no
 redundancy. To 

Re: [akka-user] [akka-streams] Consuming on TCP server side - is this really the best way?

2015-07-30 Thread Endre Varga
Hi Derek,

On Thu, Jul 30, 2015 at 6:48 PM, Derek Wyatt de...@derekwyatt.org wrote:

 These examples, and the new documentation page, are extremely helpful.
 Thank you *very* much. I’m continuing to read / digest / modify them to
 get a better understanding, but they’ve already cleared up a lot for me.

 One thing that popped out was the use of Flow.transform(). The difference
 between map and transform is now clear to me, but I then started to
 question the difference between a Stage and a Flow, and they now confuse
 me a bit… (if this is covered somewhere, please forgive me). Why is a
 Stage not a derived type of Flow? Or, perhaps another way to say that is,
 Why do Stages exist at all? It feels like there are Flows, BidiFlows and
 Graphs.

If you have read the documentation page that I linked before (
http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-composition.html),
then you already know that a Flow is nothing else then a box with exactly
one input and output port, but it might contain a whole network inside.
Source is similar but with exactly one output port, while Sink has exactly
one input port. The modules can be composite, i.e. internally contain
arbitrarily complex graphs. There are modules though that are not
composite, but atomic, as they directly represent some logic not built from
other modules.

A Stage is just an atomic Flow, if you prefer to look it that way
(strictly speaking it is the transform() op which is the atomic Flow and
it is a factory for a Stage). Whenever you are calling .filter(), .map(),
etc. on a Source or Sink, you are attaching a Stage to the output port at
the end. See
https://github.com/akka/akka/blob/release-2.3-dev/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala
for most of the implementation of the built-in operators.


 Stage feels like it’s just another manifestation of a Flow, perhaps an
 earlier part of the design that may not need to be there?

No! Hell, no! It is actually one of the most solid parts of the internals.
You can build a Flow by using the built-in operators, but how do you build
the built-in operators themselves? Well, they are Stages. What should you
do when there is an operator missing? Build your Stage. You should
familiarize yourself with stages and look at the doc page for them:
http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-customize.html


 P.S. It looks like the stream-composition doc is missing something in the
 attributes section. The diagram shows that nestedSource is connected to
 nestedSink but the code doesn’t do such a thing. Did you mean to add a
 nestedSource.to(nestedSink)?

yep, that line is missing at the end. Thanks!

-Endre




  Akka Team akka.offic...@gmail.com
  July 29, 2015 at 9:28 AM
 And now I added another version where the server just streams random
 numbers until the client disconnects, then it closes the connection. It
 needed a custom stage though to make emitting from an Iterable
 interruptible (mapConcat does not interrupt on completion, only on errors).




 --
 Akka Team
 Typesafe - Reactive apps on the JVM
 Blog: letitcrash.com
 Twitter: @akkateam
  --
  Read the docs: http://akka.io/docs/
  Check the FAQ:
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
 ---
 You received this message because you are subscribed to a topic in the
 Google Groups Akka User List group.
 To unsubscribe from this topic, visit
 https://groups.google.com/d/topic/akka-user/tng5CiUtfig/unsubscribe.
 To unsubscribe from this group and all its topics, send an email to
 akka-user+unsubscr...@googlegroups.com.
 To post to this group, send email to akka-user@googlegroups.com.
 Visit this group at http://groups.google.com/group/akka-user.
 For more options, visit https://groups.google.com/d/optout.
  Endre Varga endre.va...@typesafe.com
  July 29, 2015 at 7:59 AM
 I now updated the gist with the reverse direction: Now a client sends a
 String command and expects an Iterable[Int] back as a response. I currently
 limited the funcionality to one request per connection, since otherwise I
 would need a bit more elaborate codec which would complicate the example (I
 would need to add a delimiter between the iterables on the wire. Not too
 hard to add it though). It still shows how these things are supposed to
 work.

 -Endre


 --
  Read the docs: http://akka.io/docs/
  Check the FAQ:
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
 ---
 You received this message because you are subscribed to a topic in the
 Google Groups Akka User List group.
 To unsubscribe from this topic, visit
 https://groups.google.com/d/topic/akka-user/tng5CiUtfig/unsubscribe.
 To unsubscribe from this group and all its topics, send an email to
 akka-user+unsubscr...@googlegroups.com.
 To post to this group, send email to akka-user@googlegroups.com.
 Visit this 

Re: [akka-user] [akka-streams] Consuming on TCP server side - is this really the best way?

2015-07-29 Thread Derek Wyatt

Hey Endre,

Thanks so much for doing this. I've looked through it (and skimmed the 
new doc page) but haven't digested it well enough to say anything other 
than thanks. :) I'll get back to you in the next day or so, perhaps 
with a couple of questions.


Akka Team wrote:


And now I added another version where the server just streams random
numbers until the client disconnects, then it closes the connection.
It needed a custom stage though to make emitting from an Iterable
interruptible (mapConcat does not interrupt on completion, only on
errors).

On Wed, Jul 29, 2015 at 1:59 PM, Endre Varga endre.va...@typesafe.com
mailto:endre.va...@typesafe.com wrote:

I now updated the gist with the reverse direction: Now a client
sends a String command and expects an Iterable[Int] back as a
response. I currently limited the funcionality to one request per
connection, since otherwise I would need a bit more elaborate
codec which would complicate the example (I would need to add a
delimiter between the iterables on the wire. Not too hard to add
it though). It still shows how these things are supposed to work.

-Endre

On Wed, Jul 29, 2015 at 1:14 PM, Akka Team
akka.offic...@gmail.com mailto:akka.offic...@gmail.com wrote:

Hi Derek,

It is not that hard, but you need to develop a certain kind of
intuition to attack these problems. I very much recommend the
new documentation page
http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-composition.html
as it helps to visualize the ideas.

I created a sample app that does what you want, you can find
the gist here: https://gist.github.com/drewhk/25bf7472db04b5699b80

The features in that app:
- exposes the client API as a Source[Int, Unit]. Anytime you
materialize that source and send it data, it will open a TCP
connection and dump the integers to the server, then closes
the connection
- exposes the server API as a Source[(InetSocketAddress,
Iterable[Int]), Future[ServerBinding]]. It will provide you
with a continuous stream of client address, client data
iterable pairs.
- includes a simple codec pair for encoding the Ints. It is
kind of stupid for this use case, but it works.

Some notes:
- draining the client data to an Iterable might be suboptimal
if the Iterables are large, in this case a Source[Int] would
be a better abstraction
- the implementation caps the size of the Iterable but
currently just silently ignores overflows (I was lazy to build
a stage or use fold for this sample, so I used grouped())

-Endre


On Sun, Jul 26, 2015 at 9:12 PM, Derek Wyatt
de...@derekwyatt.org mailto:de...@derekwyatt.org wrote:

Hi,

I'm still trying to figure out the best way to work with
TCP flows and, while I've got something working, this
seems really quite wrong, so there's gotta be a better way.

What I want to do is send an Iterable[Int] from the client
to the server and have the server materialize that
resulting flow in a Future[Iterable[Int]].


||
val bytesStage =// elided... BidiFlow of serialization and
framing

val serverValuePromise =Promise[Seq[AnyRef]]()

// Technically, the materialized value isn't important,
since it's actually going to be pulled out
// via the Promise
val
serverConsumerFlow:Flow[AnyRef,AnyRef,Future[Seq[AnyRef]]]=Flow.wrap(
// Consume the client's stream and complete the
serverValuePromise with its folded result
Sink.fold(Vector.empty[AnyRef])((acc,v:AnyRef)=acc
:+v).mapMaterializedValue(v
={serverValuePromise.completeWith(v);v }),
// We're not sending anything from this side
Source.empty)(Keep.left)

// The server
val
serverSide:Future[ServerBinding]=StreamTcp().bindAndHandle(serverConsumerFlow.join(bytesStage),0.0.0.0,0,halfClose
=true)

// We really want to stop listening once the client has
successfully connected, but this is good
// enough
serverValuePromise.future.onComplete {
case_ =
serverSide.onSuccess {
casebinding =binding.unbind()
}
}

// I need the endpoint where the client needs to connect
val destination
=Await.result(serverSide,1.second).localAddress

// Get the source running
Source((1to
10).map(newInteger(_))).via(bytesStage.joinMat(StreamTcp().outgoingConnection(destination))(Keep.right)).to(Sink.ignore).run()

// Print out what the client has sent to the server
Await.result(serverValuePromise.future,1.second).foreach(t
=println(stt: $t))

I tried doing this the other way around - where the server
side supplies source - but this caused me issues with
actually shutting down the socket. Having the client do it
seems to make shutting down the socket on completion of
the source, just naturally occur. The problem with the
server side providing the source was that the client
source needed to finish properly. If I created it as
`empty` then it would kill things too quickly. If I then
created it as a n Actor source that just didn't do
anything, I couldn't find a decent way to close it.

There's gotta be a better way to do this, but I'm too much
of a noob to see it. Can anyone improve this code for me?

Thanks,
Derek




Re: [akka-user] [akka-streams] Consuming on TCP server side - is this really the best way?

2015-07-29 Thread Endre Varga
I now updated the gist with the reverse direction: Now a client sends a
String command and expects an Iterable[Int] back as a response. I currently
limited the funcionality to one request per connection, since otherwise I
would need a bit more elaborate codec which would complicate the example (I
would need to add a delimiter between the iterables on the wire. Not too
hard to add it though). It still shows how these things are supposed to
work.

-Endre

On Wed, Jul 29, 2015 at 1:14 PM, Akka Team akka.offic...@gmail.com wrote:

 Hi Derek,

 It is not that hard, but you need to develop a certain kind of intuition
 to attack these problems. I very much recommend the new documentation page
 http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-composition.html
 as it helps to visualize the ideas.

 I created a sample app that does what you want, you can find the gist
 here: https://gist.github.com/drewhk/25bf7472db04b5699b80

 The features in that app:
  - exposes the client API as a Source[Int, Unit]. Anytime you materialize
 that source and send it data, it will open a TCP connection and dump the
 integers to the server, then closes the connection
  - exposes the server API as a Source[(InetSocketAddress, Iterable[Int]),
 Future[ServerBinding]]. It will provide you with a continuous stream of
 client address, client data iterable pairs.
  - includes a simple codec pair for encoding the Ints. It is kind of
 stupid for this use case, but it works.

 Some notes:
  - draining the client data to an Iterable might be suboptimal if the
 Iterables are large, in this case a Source[Int] would be a better
 abstraction
  - the implementation caps the size of the Iterable but currently just
 silently ignores overflows (I was lazy to build a stage or use fold for
 this sample, so I used grouped())

 -Endre


 On Sun, Jul 26, 2015 at 9:12 PM, Derek Wyatt de...@derekwyatt.org wrote:

 Hi,

 I'm still trying to figure out the best way to work with TCP flows and,
 while I've got something working, this seems really quite wrong, so there's
 gotta be a better way.

 What I want to do is send an Iterable[Int] from the client to the server
 and have the server materialize that resulting flow in a
 Future[Iterable[Int]].


 val bytesStage = // elided... BidiFlow of serialization and framing

 val serverValuePromise = Promise[Seq[AnyRef]]()

 // Technically, the materialized value isn't important, since it's
 actually going to be pulled out
 // via the Promise
 val serverConsumerFlow: Flow[AnyRef, AnyRef, Future[Seq[AnyRef]]] = Flow.
 wrap(
   // Consume the client's stream and complete the serverValuePromise
 with its folded result
   Sink.fold(Vector.empty[AnyRef])((acc, v: AnyRef) = acc :+ v).
 mapMaterializedValue(v = { serverValuePromise.completeWith(v); v }),
   // We're not sending anything from this side
   Source.empty)(Keep.left)

 // The server
 val serverSide: Future[ServerBinding] = StreamTcp().bindAndHandle(
 serverConsumerFlow.join(bytesStage), 0.0.0.0, 0, halfClose = true)

 // We really want to stop listening once the client has successfully
 connected, but this is good
 // enough
 serverValuePromise.future.onComplete {
   case _ =
 serverSide.onSuccess {
   case binding = binding.unbind()
 }
 }

 // I need the endpoint where the client needs to connect
 val destination = Await.result(serverSide, 1.second).localAddress

 // Get the source running
 Source((1 to 10).map(new Integer(_))).via(bytesStage.joinMat(StreamTcp().
 outgoingConnection(destination))(Keep.right)).to(Sink.ignore).run()

 // Print out what the client has sent to the server
 Await.result(serverValuePromise.future, 1.second).foreach(t = println(stt:
 $t))

 I tried doing this the other way around - where the server side supplies
 source - but this caused me issues with actually shutting down the socket.
 Having the client do it seems to make shutting down the socket on
 completion of the source, just naturally occur.  The problem with the
 server side providing the source was that the client source needed to
 finish properly.  If I created it as `empty` then it would kill things
 too quickly.  If I then created it as a n Actor source that just didn't do
 anything, I couldn't find a decent way to close it.

 There's gotta be a better way to do this, but I'm too much of a noob to
 see it.  Can anyone improve this code for me?

 Thanks,
 Derek



  --
  Read the docs: http://akka.io/docs/
  Check the FAQ:
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
 ---
 You received this message because you are subscribed to the Google Groups
 Akka User List group.
 To unsubscribe from this group and stop receiving emails from it, send an
 email to akka-user+unsubscr...@googlegroups.com.
 To post to this group, send email to akka-user@googlegroups.com.
 Visit this group at http://groups.google.com/group/akka-user.
 For more options, visit 

Re: [akka-user] [akka-streams] Consuming on TCP server side - is this really the best way?

2015-07-29 Thread Akka Team
Hi Derek,

It is not that hard, but you need to develop a certain kind of intuition to
attack these problems. I very much recommend the new documentation page
http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-composition.html
as it helps to visualize the ideas.

I created a sample app that does what you want, you can find the gist here:
https://gist.github.com/drewhk/25bf7472db04b5699b80

The features in that app:
 - exposes the client API as a Source[Int, Unit]. Anytime you materialize
that source and send it data, it will open a TCP connection and dump the
integers to the server, then closes the connection
 - exposes the server API as a Source[(InetSocketAddress, Iterable[Int]),
Future[ServerBinding]]. It will provide you with a continuous stream of
client address, client data iterable pairs.
 - includes a simple codec pair for encoding the Ints. It is kind of stupid
for this use case, but it works.

Some notes:
 - draining the client data to an Iterable might be suboptimal if the
Iterables are large, in this case a Source[Int] would be a better
abstraction
 - the implementation caps the size of the Iterable but currently just
silently ignores overflows (I was lazy to build a stage or use fold for
this sample, so I used grouped())

-Endre


On Sun, Jul 26, 2015 at 9:12 PM, Derek Wyatt de...@derekwyatt.org wrote:

 Hi,

 I'm still trying to figure out the best way to work with TCP flows and,
 while I've got something working, this seems really quite wrong, so there's
 gotta be a better way.

 What I want to do is send an Iterable[Int] from the client to the server
 and have the server materialize that resulting flow in a
 Future[Iterable[Int]].


 val bytesStage = // elided... BidiFlow of serialization and framing

 val serverValuePromise = Promise[Seq[AnyRef]]()

 // Technically, the materialized value isn't important, since it's
 actually going to be pulled out
 // via the Promise
 val serverConsumerFlow: Flow[AnyRef, AnyRef, Future[Seq[AnyRef]]] = Flow.
 wrap(
   // Consume the client's stream and complete the serverValuePromise with
 its folded result
   Sink.fold(Vector.empty[AnyRef])((acc, v: AnyRef) = acc :+ v).
 mapMaterializedValue(v = { serverValuePromise.completeWith(v); v }),
   // We're not sending anything from this side
   Source.empty)(Keep.left)

 // The server
 val serverSide: Future[ServerBinding] = StreamTcp().bindAndHandle(
 serverConsumerFlow.join(bytesStage), 0.0.0.0, 0, halfClose = true)

 // We really want to stop listening once the client has successfully
 connected, but this is good
 // enough
 serverValuePromise.future.onComplete {
   case _ =
 serverSide.onSuccess {
   case binding = binding.unbind()
 }
 }

 // I need the endpoint where the client needs to connect
 val destination = Await.result(serverSide, 1.second).localAddress

 // Get the source running
 Source((1 to 10).map(new Integer(_))).via(bytesStage.joinMat(StreamTcp().
 outgoingConnection(destination))(Keep.right)).to(Sink.ignore).run()

 // Print out what the client has sent to the server
 Await.result(serverValuePromise.future, 1.second).foreach(t = println(stt:
 $t))

 I tried doing this the other way around - where the server side supplies
 source - but this caused me issues with actually shutting down the socket.
 Having the client do it seems to make shutting down the socket on
 completion of the source, just naturally occur.  The problem with the
 server side providing the source was that the client source needed to
 finish properly.  If I created it as `empty` then it would kill things
 too quickly.  If I then created it as a n Actor source that just didn't do
 anything, I couldn't find a decent way to close it.

 There's gotta be a better way to do this, but I'm too much of a noob to
 see it.  Can anyone improve this code for me?

 Thanks,
 Derek



  --
  Read the docs: http://akka.io/docs/
  Check the FAQ:
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
 ---
 You received this message because you are subscribed to the Google Groups
 Akka User List group.
 To unsubscribe from this group and stop receiving emails from it, send an
 email to akka-user+unsubscr...@googlegroups.com.
 To post to this group, send email to akka-user@googlegroups.com.
 Visit this group at http://groups.google.com/group/akka-user.
 For more options, visit https://groups.google.com/d/optout.




-- 
Akka Team
Typesafe - Reactive apps on the JVM
Blog: letitcrash.com
Twitter: @akkateam

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, 

Re: [akka-user] [akka-streams] Consuming on TCP server side - is this really the best way?

2015-07-29 Thread Akka Team
And now I added another version where the server just streams random
numbers until the client disconnects, then it closes the connection. It
needed a custom stage though to make emitting from an Iterable
interruptible (mapConcat does not interrupt on completion, only on errors).

On Wed, Jul 29, 2015 at 1:59 PM, Endre Varga endre.va...@typesafe.com
wrote:

 I now updated the gist with the reverse direction: Now a client sends a
 String command and expects an Iterable[Int] back as a response. I currently
 limited the funcionality to one request per connection, since otherwise I
 would need a bit more elaborate codec which would complicate the example (I
 would need to add a delimiter between the iterables on the wire. Not too
 hard to add it though). It still shows how these things are supposed to
 work.

 -Endre

 On Wed, Jul 29, 2015 at 1:14 PM, Akka Team akka.offic...@gmail.com
 wrote:

 Hi Derek,

 It is not that hard, but you need to develop a certain kind of intuition
 to attack these problems. I very much recommend the new documentation page
 http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-composition.html
 as it helps to visualize the ideas.

 I created a sample app that does what you want, you can find the gist
 here: https://gist.github.com/drewhk/25bf7472db04b5699b80

 The features in that app:
  - exposes the client API as a Source[Int, Unit]. Anytime you
 materialize that source and send it data, it will open a TCP connection and
 dump the integers to the server, then closes the connection
  - exposes the server API as a Source[(InetSocketAddress,
 Iterable[Int]), Future[ServerBinding]]. It will provide you with a
 continuous stream of client address, client data iterable pairs.
  - includes a simple codec pair for encoding the Ints. It is kind of
 stupid for this use case, but it works.

 Some notes:
  - draining the client data to an Iterable might be suboptimal if the
 Iterables are large, in this case a Source[Int] would be a better
 abstraction
  - the implementation caps the size of the Iterable but currently just
 silently ignores overflows (I was lazy to build a stage or use fold for
 this sample, so I used grouped())

 -Endre


 On Sun, Jul 26, 2015 at 9:12 PM, Derek Wyatt de...@derekwyatt.org
 wrote:

 Hi,

 I'm still trying to figure out the best way to work with TCP flows and,
 while I've got something working, this seems really quite wrong, so there's
 gotta be a better way.

 What I want to do is send an Iterable[Int] from the client to the
 server and have the server materialize that resulting flow in a
 Future[Iterable[Int]].


 val bytesStage = // elided... BidiFlow of serialization and framing

 val serverValuePromise = Promise[Seq[AnyRef]]()

 // Technically, the materialized value isn't important, since it's
 actually going to be pulled out
 // via the Promise
 val serverConsumerFlow: Flow[AnyRef, AnyRef, Future[Seq[AnyRef]]] = Flow
 .wrap(
   // Consume the client's stream and complete the serverValuePromise
 with its folded result
   Sink.fold(Vector.empty[AnyRef])((acc, v: AnyRef) = acc :+ v).
 mapMaterializedValue(v = { serverValuePromise.completeWith(v); v }),
   // We're not sending anything from this side
   Source.empty)(Keep.left)

 // The server
 val serverSide: Future[ServerBinding] = StreamTcp().bindAndHandle(
 serverConsumerFlow.join(bytesStage), 0.0.0.0, 0, halfClose = true)

 // We really want to stop listening once the client has successfully
 connected, but this is good
 // enough
 serverValuePromise.future.onComplete {
   case _ =
 serverSide.onSuccess {
   case binding = binding.unbind()
 }
 }

 // I need the endpoint where the client needs to connect
 val destination = Await.result(serverSide, 1.second).localAddress

 // Get the source running
 Source((1 to 10).map(new Integer(_))).via(bytesStage.joinMat(StreamTcp
 ().outgoingConnection(destination))(Keep.right)).to(Sink.ignore).run()

 // Print out what the client has sent to the server
 Await.result(serverValuePromise.future, 1.second).foreach(t = println(stt:
 $t))

 I tried doing this the other way around - where the server side supplies
 source - but this caused me issues with actually shutting down the socket.
 Having the client do it seems to make shutting down the socket on
 completion of the source, just naturally occur.  The problem with the
 server side providing the source was that the client source needed to
 finish properly.  If I created it as `empty` then it would kill things
 too quickly.  If I then created it as a n Actor source that just didn't do
 anything, I couldn't find a decent way to close it.

 There's gotta be a better way to do this, but I'm too much of a noob to
 see it.  Can anyone improve this code for me?

 Thanks,
 Derek



  --
  Read the docs: http://akka.io/docs/
  Check the FAQ:
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives:
 https://groups.google.com/group/akka-user
 ---
 You received this 

[akka-user] [akka-streams] Consuming on TCP server side - is this really the best way?

2015-07-26 Thread Derek Wyatt
Hi,

I'm still trying to figure out the best way to work with TCP flows and, 
while I've got something working, this seems really quite wrong, so there's 
gotta be a better way.

What I want to do is send an Iterable[Int] from the client to the server 
and have the server materialize that resulting flow in a 
Future[Iterable[Int]]. 


val bytesStage = // elided... BidiFlow of serialization and framing

val serverValuePromise = Promise[Seq[AnyRef]]()

// Technically, the materialized value isn't important, since it's actually 
going to be pulled out
// via the Promise
val serverConsumerFlow: Flow[AnyRef, AnyRef, Future[Seq[AnyRef]]] = Flow.
wrap(
  // Consume the client's stream and complete the serverValuePromise with 
its folded result
  Sink.fold(Vector.empty[AnyRef])((acc, v: AnyRef) = acc :+ v).
mapMaterializedValue(v = { serverValuePromise.completeWith(v); v }),
  // We're not sending anything from this side
  Source.empty)(Keep.left)

// The server
val serverSide: Future[ServerBinding] = StreamTcp().bindAndHandle(
serverConsumerFlow.join(bytesStage), 0.0.0.0, 0, halfClose = true)

// We really want to stop listening once the client has successfully 
connected, but this is good
// enough
serverValuePromise.future.onComplete {
  case _ =
serverSide.onSuccess {
  case binding = binding.unbind()
}
}

// I need the endpoint where the client needs to connect
val destination = Await.result(serverSide, 1.second).localAddress

// Get the source running
Source((1 to 10).map(new Integer(_))).via(bytesStage.joinMat(StreamTcp().
outgoingConnection(destination))(Keep.right)).to(Sink.ignore).run()

// Print out what the client has sent to the server
Await.result(serverValuePromise.future, 1.second).foreach(t = println(stt: 
$t))

I tried doing this the other way around - where the server side supplies 
source - but this caused me issues with actually shutting down the socket. 
Having the client do it seems to make shutting down the socket on 
completion of the source, just naturally occur.  The problem with the 
server side providing the source was that the client source needed to 
finish properly.  If I created it as `empty` then it would kill things 
too quickly.  If I then created it as a n Actor source that just didn't do 
anything, I couldn't find a decent way to close it.

There's gotta be a better way to do this, but I'm too much of a noob to see 
it.  Can anyone improve this code for me?

Thanks,
Derek



-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.