Re: [akka-user] [akka-streams] Consuming on TCP server side - is this really the best way?
this is excellent feedback, I think there's room for improvement for grokkability of Akka Streams, thanks Derek and Endre. -- Cheers, √ On 1 Aug 2015 13:12, Derek Wyatt de...@derekwyatt.org wrote: Endre Varga endre.va...@typesafe.com July 31, 2015 at 4:47 AM On Thu, Jul 30, 2015 at 9:07 PM, Derek Wyatt de...@derekwyatt.org wrote: Endre Varga endre.va...@typesafe.com July 30, 2015 at 2:23 PM Hi Derek, On Thu, Jul 30, 2015 at 6:48 PM, Derek Wyatt de...@derekwyatt.org wrote: These examples, and the new documentation page, are extremely helpful. Thank you *very* much. I’m continuing to read / digest / modify them to get a better understanding, but they’ve already cleared up a lot for me. One thing that popped out was the use of Flow.transform(). The difference between map and transform is now clear to me, but I then started to question the difference between a Stage and a Flow, and they now confuse me a bit… (if this is covered somewhere, please forgive me). Why is a Stage not a derived type of Flow? Or, perhaps another way to say that is, Why do Stages exist at all? It feels like there are Flows, BidiFlows and Graphs. If you have read the documentation page that I linked before ( http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-composition.html), then you already know that a Flow is nothing else then a box with exactly one input and output port, but it might contain a whole network inside. Source is similar but with exactly one output port, while Sink has exactly one input port. The modules can be composite, i.e. internally contain arbitrarily complex graphs. There are modules though that are not composite, but atomic, as they directly represent some logic not built from other modules. I did read it :) And all that makes perfect sense (now). A Stage is just an atomic Flow, if you prefer to look it that way (strictly speaking it is the transform() op which is the atomic Flow and it is a factory for a Stage). Whenever you are calling .filter(), .map(), etc. on a Source or Sink, you are attaching a Stage to the output port at the end. See https://github.com/akka/akka/blob/release-2.3-dev/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala for most of the implementation of the built-in operators. OK. That's good to know. Digging into the code certainly makes it clearer, and it's nice getting a direct pointer of something specific to look at. Stage feels like it’s just another manifestation of a Flow, perhaps an earlier part of the design that may not need to be there? No! Hell, no! It is actually one of the most solid parts of the internals. You can build a Flow by using the So, you're saying, no, then? :) Fair enough... It just looks like a Stage has one input port and one output port, so I started to wonder why there was a distinction. Perhaps it's because it's a solid part of the internals, and as an internal component it makes perfect sense. When it comes out of the API, maybe it's a bit confusing... but... I'm still learning. Well, somehow you need to build new stream processing logic when the built-in ones are not sufficient. Hence the Stage API. built-in operators, but how do you build the built-in operators themselves? Well, they are Stages. What should you do when there is an operator missing? Build your Stage. You should familiarize yourself with stages and look at the doc page for them: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-customize.html I've certainly read that too, and in-and-of-itself it makes perfect sense. Really what made me question it was the existence of `Flow.transform()`. Then from there it was a question as to why you would need to transform a Stage with one in and one out, into a Flow of one in and one out. A Stage is an atomic processing entity, a Flow is a chain (network) of such entities. Btw, the name transform means transform this stream of elements into a different stream of elements by using this logic it is not transform this Stage into a Flow. Yup, that reads better. When you call myFlow.map(f) what is actually called in the background is myFlow.transform(() = Map(f)), i.e. map is just a sugar over the most generic transform API. Stages are a way to express backpressured stream processing logic in an efficient manner. And then when I find myself binding Stages together in the same-ish manner as what I would bind Flows together, it just seemed redundant. You don't bind Stages together, where does that come from? There must be some source of confusion here, You're right, you don't. I apologize... the code that I'm commenting on, I had to delete, so I'm working from memory. But in the end, it looked like what's on the stream-customization page. I believe I had Flows being transformed using Stages. the only way to bind Stages together is via the Flow API. I.e. there is no redundancy. To
Re: [akka-user] [akka-streams] Consuming on TCP server side - is this really the best way?
Hi Derek, On Thu, Jul 30, 2015 at 6:48 PM, Derek Wyatt de...@derekwyatt.org wrote: These examples, and the new documentation page, are extremely helpful. Thank you *very* much. I’m continuing to read / digest / modify them to get a better understanding, but they’ve already cleared up a lot for me. One thing that popped out was the use of Flow.transform(). The difference between map and transform is now clear to me, but I then started to question the difference between a Stage and a Flow, and they now confuse me a bit… (if this is covered somewhere, please forgive me). Why is a Stage not a derived type of Flow? Or, perhaps another way to say that is, Why do Stages exist at all? It feels like there are Flows, BidiFlows and Graphs. If you have read the documentation page that I linked before ( http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-composition.html), then you already know that a Flow is nothing else then a box with exactly one input and output port, but it might contain a whole network inside. Source is similar but with exactly one output port, while Sink has exactly one input port. The modules can be composite, i.e. internally contain arbitrarily complex graphs. There are modules though that are not composite, but atomic, as they directly represent some logic not built from other modules. A Stage is just an atomic Flow, if you prefer to look it that way (strictly speaking it is the transform() op which is the atomic Flow and it is a factory for a Stage). Whenever you are calling .filter(), .map(), etc. on a Source or Sink, you are attaching a Stage to the output port at the end. See https://github.com/akka/akka/blob/release-2.3-dev/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala for most of the implementation of the built-in operators. Stage feels like it’s just another manifestation of a Flow, perhaps an earlier part of the design that may not need to be there? No! Hell, no! It is actually one of the most solid parts of the internals. You can build a Flow by using the built-in operators, but how do you build the built-in operators themselves? Well, they are Stages. What should you do when there is an operator missing? Build your Stage. You should familiarize yourself with stages and look at the doc page for them: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-customize.html P.S. It looks like the stream-composition doc is missing something in the attributes section. The diagram shows that nestedSource is connected to nestedSink but the code doesn’t do such a thing. Did you mean to add a nestedSource.to(nestedSink)? yep, that line is missing at the end. Thanks! -Endre Akka Team akka.offic...@gmail.com July 29, 2015 at 9:28 AM And now I added another version where the server just streams random numbers until the client disconnects, then it closes the connection. It needed a custom stage though to make emitting from an Iterable interruptible (mapConcat does not interrupt on completion, only on errors). -- Akka Team Typesafe - Reactive apps on the JVM Blog: letitcrash.com Twitter: @akkateam -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to a topic in the Google Groups Akka User List group. To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/tng5CiUtfig/unsubscribe. To unsubscribe from this group and all its topics, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. Endre Varga endre.va...@typesafe.com July 29, 2015 at 7:59 AM I now updated the gist with the reverse direction: Now a client sends a String command and expects an Iterable[Int] back as a response. I currently limited the funcionality to one request per connection, since otherwise I would need a bit more elaborate codec which would complicate the example (I would need to add a delimiter between the iterables on the wire. Not too hard to add it though). It still shows how these things are supposed to work. -Endre -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to a topic in the Google Groups Akka User List group. To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/tng5CiUtfig/unsubscribe. To unsubscribe from this group and all its topics, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this
Re: [akka-user] [akka-streams] Consuming on TCP server side - is this really the best way?
Hey Endre, Thanks so much for doing this. I've looked through it (and skimmed the new doc page) but haven't digested it well enough to say anything other than thanks. :) I'll get back to you in the next day or so, perhaps with a couple of questions. Akka Team wrote: And now I added another version where the server just streams random numbers until the client disconnects, then it closes the connection. It needed a custom stage though to make emitting from an Iterable interruptible (mapConcat does not interrupt on completion, only on errors). On Wed, Jul 29, 2015 at 1:59 PM, Endre Varga endre.va...@typesafe.com mailto:endre.va...@typesafe.com wrote: I now updated the gist with the reverse direction: Now a client sends a String command and expects an Iterable[Int] back as a response. I currently limited the funcionality to one request per connection, since otherwise I would need a bit more elaborate codec which would complicate the example (I would need to add a delimiter between the iterables on the wire. Not too hard to add it though). It still shows how these things are supposed to work. -Endre On Wed, Jul 29, 2015 at 1:14 PM, Akka Team akka.offic...@gmail.com mailto:akka.offic...@gmail.com wrote: Hi Derek, It is not that hard, but you need to develop a certain kind of intuition to attack these problems. I very much recommend the new documentation page http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-composition.html as it helps to visualize the ideas. I created a sample app that does what you want, you can find the gist here: https://gist.github.com/drewhk/25bf7472db04b5699b80 The features in that app: - exposes the client API as a Source[Int, Unit]. Anytime you materialize that source and send it data, it will open a TCP connection and dump the integers to the server, then closes the connection - exposes the server API as a Source[(InetSocketAddress, Iterable[Int]), Future[ServerBinding]]. It will provide you with a continuous stream of client address, client data iterable pairs. - includes a simple codec pair for encoding the Ints. It is kind of stupid for this use case, but it works. Some notes: - draining the client data to an Iterable might be suboptimal if the Iterables are large, in this case a Source[Int] would be a better abstraction - the implementation caps the size of the Iterable but currently just silently ignores overflows (I was lazy to build a stage or use fold for this sample, so I used grouped()) -Endre On Sun, Jul 26, 2015 at 9:12 PM, Derek Wyatt de...@derekwyatt.org mailto:de...@derekwyatt.org wrote: Hi, I'm still trying to figure out the best way to work with TCP flows and, while I've got something working, this seems really quite wrong, so there's gotta be a better way. What I want to do is send an Iterable[Int] from the client to the server and have the server materialize that resulting flow in a Future[Iterable[Int]]. || val bytesStage =// elided... BidiFlow of serialization and framing val serverValuePromise =Promise[Seq[AnyRef]]() // Technically, the materialized value isn't important, since it's actually going to be pulled out // via the Promise val serverConsumerFlow:Flow[AnyRef,AnyRef,Future[Seq[AnyRef]]]=Flow.wrap( // Consume the client's stream and complete the serverValuePromise with its folded result Sink.fold(Vector.empty[AnyRef])((acc,v:AnyRef)=acc :+v).mapMaterializedValue(v ={serverValuePromise.completeWith(v);v }), // We're not sending anything from this side Source.empty)(Keep.left) // The server val serverSide:Future[ServerBinding]=StreamTcp().bindAndHandle(serverConsumerFlow.join(bytesStage),0.0.0.0,0,halfClose =true) // We really want to stop listening once the client has successfully connected, but this is good // enough serverValuePromise.future.onComplete { case_ = serverSide.onSuccess { casebinding =binding.unbind() } } // I need the endpoint where the client needs to connect val destination =Await.result(serverSide,1.second).localAddress // Get the source running Source((1to 10).map(newInteger(_))).via(bytesStage.joinMat(StreamTcp().outgoingConnection(destination))(Keep.right)).to(Sink.ignore).run() // Print out what the client has sent to the server Await.result(serverValuePromise.future,1.second).foreach(t =println(stt: $t)) I tried doing this the other way around - where the server side supplies source - but this caused me issues with actually shutting down the socket. Having the client do it seems to make shutting down the socket on completion of the source, just naturally occur. The problem with the server side providing the source was that the client source needed to finish properly. If I created it as `empty` then it would kill things too quickly. If I then created it as a n Actor source that just didn't do anything, I couldn't find a decent way to close it. There's gotta be a better way to do this, but I'm too much of a noob to see it. Can anyone improve this code for me? Thanks, Derek
Re: [akka-user] [akka-streams] Consuming on TCP server side - is this really the best way?
I now updated the gist with the reverse direction: Now a client sends a String command and expects an Iterable[Int] back as a response. I currently limited the funcionality to one request per connection, since otherwise I would need a bit more elaborate codec which would complicate the example (I would need to add a delimiter between the iterables on the wire. Not too hard to add it though). It still shows how these things are supposed to work. -Endre On Wed, Jul 29, 2015 at 1:14 PM, Akka Team akka.offic...@gmail.com wrote: Hi Derek, It is not that hard, but you need to develop a certain kind of intuition to attack these problems. I very much recommend the new documentation page http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-composition.html as it helps to visualize the ideas. I created a sample app that does what you want, you can find the gist here: https://gist.github.com/drewhk/25bf7472db04b5699b80 The features in that app: - exposes the client API as a Source[Int, Unit]. Anytime you materialize that source and send it data, it will open a TCP connection and dump the integers to the server, then closes the connection - exposes the server API as a Source[(InetSocketAddress, Iterable[Int]), Future[ServerBinding]]. It will provide you with a continuous stream of client address, client data iterable pairs. - includes a simple codec pair for encoding the Ints. It is kind of stupid for this use case, but it works. Some notes: - draining the client data to an Iterable might be suboptimal if the Iterables are large, in this case a Source[Int] would be a better abstraction - the implementation caps the size of the Iterable but currently just silently ignores overflows (I was lazy to build a stage or use fold for this sample, so I used grouped()) -Endre On Sun, Jul 26, 2015 at 9:12 PM, Derek Wyatt de...@derekwyatt.org wrote: Hi, I'm still trying to figure out the best way to work with TCP flows and, while I've got something working, this seems really quite wrong, so there's gotta be a better way. What I want to do is send an Iterable[Int] from the client to the server and have the server materialize that resulting flow in a Future[Iterable[Int]]. val bytesStage = // elided... BidiFlow of serialization and framing val serverValuePromise = Promise[Seq[AnyRef]]() // Technically, the materialized value isn't important, since it's actually going to be pulled out // via the Promise val serverConsumerFlow: Flow[AnyRef, AnyRef, Future[Seq[AnyRef]]] = Flow. wrap( // Consume the client's stream and complete the serverValuePromise with its folded result Sink.fold(Vector.empty[AnyRef])((acc, v: AnyRef) = acc :+ v). mapMaterializedValue(v = { serverValuePromise.completeWith(v); v }), // We're not sending anything from this side Source.empty)(Keep.left) // The server val serverSide: Future[ServerBinding] = StreamTcp().bindAndHandle( serverConsumerFlow.join(bytesStage), 0.0.0.0, 0, halfClose = true) // We really want to stop listening once the client has successfully connected, but this is good // enough serverValuePromise.future.onComplete { case _ = serverSide.onSuccess { case binding = binding.unbind() } } // I need the endpoint where the client needs to connect val destination = Await.result(serverSide, 1.second).localAddress // Get the source running Source((1 to 10).map(new Integer(_))).via(bytesStage.joinMat(StreamTcp(). outgoingConnection(destination))(Keep.right)).to(Sink.ignore).run() // Print out what the client has sent to the server Await.result(serverValuePromise.future, 1.second).foreach(t = println(stt: $t)) I tried doing this the other way around - where the server side supplies source - but this caused me issues with actually shutting down the socket. Having the client do it seems to make shutting down the socket on completion of the source, just naturally occur. The problem with the server side providing the source was that the client source needed to finish properly. If I created it as `empty` then it would kill things too quickly. If I then created it as a n Actor source that just didn't do anything, I couldn't find a decent way to close it. There's gotta be a better way to do this, but I'm too much of a noob to see it. Can anyone improve this code for me? Thanks, Derek -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit
Re: [akka-user] [akka-streams] Consuming on TCP server side - is this really the best way?
Hi Derek, It is not that hard, but you need to develop a certain kind of intuition to attack these problems. I very much recommend the new documentation page http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-composition.html as it helps to visualize the ideas. I created a sample app that does what you want, you can find the gist here: https://gist.github.com/drewhk/25bf7472db04b5699b80 The features in that app: - exposes the client API as a Source[Int, Unit]. Anytime you materialize that source and send it data, it will open a TCP connection and dump the integers to the server, then closes the connection - exposes the server API as a Source[(InetSocketAddress, Iterable[Int]), Future[ServerBinding]]. It will provide you with a continuous stream of client address, client data iterable pairs. - includes a simple codec pair for encoding the Ints. It is kind of stupid for this use case, but it works. Some notes: - draining the client data to an Iterable might be suboptimal if the Iterables are large, in this case a Source[Int] would be a better abstraction - the implementation caps the size of the Iterable but currently just silently ignores overflows (I was lazy to build a stage or use fold for this sample, so I used grouped()) -Endre On Sun, Jul 26, 2015 at 9:12 PM, Derek Wyatt de...@derekwyatt.org wrote: Hi, I'm still trying to figure out the best way to work with TCP flows and, while I've got something working, this seems really quite wrong, so there's gotta be a better way. What I want to do is send an Iterable[Int] from the client to the server and have the server materialize that resulting flow in a Future[Iterable[Int]]. val bytesStage = // elided... BidiFlow of serialization and framing val serverValuePromise = Promise[Seq[AnyRef]]() // Technically, the materialized value isn't important, since it's actually going to be pulled out // via the Promise val serverConsumerFlow: Flow[AnyRef, AnyRef, Future[Seq[AnyRef]]] = Flow. wrap( // Consume the client's stream and complete the serverValuePromise with its folded result Sink.fold(Vector.empty[AnyRef])((acc, v: AnyRef) = acc :+ v). mapMaterializedValue(v = { serverValuePromise.completeWith(v); v }), // We're not sending anything from this side Source.empty)(Keep.left) // The server val serverSide: Future[ServerBinding] = StreamTcp().bindAndHandle( serverConsumerFlow.join(bytesStage), 0.0.0.0, 0, halfClose = true) // We really want to stop listening once the client has successfully connected, but this is good // enough serverValuePromise.future.onComplete { case _ = serverSide.onSuccess { case binding = binding.unbind() } } // I need the endpoint where the client needs to connect val destination = Await.result(serverSide, 1.second).localAddress // Get the source running Source((1 to 10).map(new Integer(_))).via(bytesStage.joinMat(StreamTcp(). outgoingConnection(destination))(Keep.right)).to(Sink.ignore).run() // Print out what the client has sent to the server Await.result(serverValuePromise.future, 1.second).foreach(t = println(stt: $t)) I tried doing this the other way around - where the server side supplies source - but this caused me issues with actually shutting down the socket. Having the client do it seems to make shutting down the socket on completion of the source, just naturally occur. The problem with the server side providing the source was that the client source needed to finish properly. If I created it as `empty` then it would kill things too quickly. If I then created it as a n Actor source that just didn't do anything, I couldn't find a decent way to close it. There's gotta be a better way to do this, but I'm too much of a noob to see it. Can anyone improve this code for me? Thanks, Derek -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Akka Team Typesafe - Reactive apps on the JVM Blog: letitcrash.com Twitter: @akkateam -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group,
Re: [akka-user] [akka-streams] Consuming on TCP server side - is this really the best way?
And now I added another version where the server just streams random numbers until the client disconnects, then it closes the connection. It needed a custom stage though to make emitting from an Iterable interruptible (mapConcat does not interrupt on completion, only on errors). On Wed, Jul 29, 2015 at 1:59 PM, Endre Varga endre.va...@typesafe.com wrote: I now updated the gist with the reverse direction: Now a client sends a String command and expects an Iterable[Int] back as a response. I currently limited the funcionality to one request per connection, since otherwise I would need a bit more elaborate codec which would complicate the example (I would need to add a delimiter between the iterables on the wire. Not too hard to add it though). It still shows how these things are supposed to work. -Endre On Wed, Jul 29, 2015 at 1:14 PM, Akka Team akka.offic...@gmail.com wrote: Hi Derek, It is not that hard, but you need to develop a certain kind of intuition to attack these problems. I very much recommend the new documentation page http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-composition.html as it helps to visualize the ideas. I created a sample app that does what you want, you can find the gist here: https://gist.github.com/drewhk/25bf7472db04b5699b80 The features in that app: - exposes the client API as a Source[Int, Unit]. Anytime you materialize that source and send it data, it will open a TCP connection and dump the integers to the server, then closes the connection - exposes the server API as a Source[(InetSocketAddress, Iterable[Int]), Future[ServerBinding]]. It will provide you with a continuous stream of client address, client data iterable pairs. - includes a simple codec pair for encoding the Ints. It is kind of stupid for this use case, but it works. Some notes: - draining the client data to an Iterable might be suboptimal if the Iterables are large, in this case a Source[Int] would be a better abstraction - the implementation caps the size of the Iterable but currently just silently ignores overflows (I was lazy to build a stage or use fold for this sample, so I used grouped()) -Endre On Sun, Jul 26, 2015 at 9:12 PM, Derek Wyatt de...@derekwyatt.org wrote: Hi, I'm still trying to figure out the best way to work with TCP flows and, while I've got something working, this seems really quite wrong, so there's gotta be a better way. What I want to do is send an Iterable[Int] from the client to the server and have the server materialize that resulting flow in a Future[Iterable[Int]]. val bytesStage = // elided... BidiFlow of serialization and framing val serverValuePromise = Promise[Seq[AnyRef]]() // Technically, the materialized value isn't important, since it's actually going to be pulled out // via the Promise val serverConsumerFlow: Flow[AnyRef, AnyRef, Future[Seq[AnyRef]]] = Flow .wrap( // Consume the client's stream and complete the serverValuePromise with its folded result Sink.fold(Vector.empty[AnyRef])((acc, v: AnyRef) = acc :+ v). mapMaterializedValue(v = { serverValuePromise.completeWith(v); v }), // We're not sending anything from this side Source.empty)(Keep.left) // The server val serverSide: Future[ServerBinding] = StreamTcp().bindAndHandle( serverConsumerFlow.join(bytesStage), 0.0.0.0, 0, halfClose = true) // We really want to stop listening once the client has successfully connected, but this is good // enough serverValuePromise.future.onComplete { case _ = serverSide.onSuccess { case binding = binding.unbind() } } // I need the endpoint where the client needs to connect val destination = Await.result(serverSide, 1.second).localAddress // Get the source running Source((1 to 10).map(new Integer(_))).via(bytesStage.joinMat(StreamTcp ().outgoingConnection(destination))(Keep.right)).to(Sink.ignore).run() // Print out what the client has sent to the server Await.result(serverValuePromise.future, 1.second).foreach(t = println(stt: $t)) I tried doing this the other way around - where the server side supplies source - but this caused me issues with actually shutting down the socket. Having the client do it seems to make shutting down the socket on completion of the source, just naturally occur. The problem with the server side providing the source was that the client source needed to finish properly. If I created it as `empty` then it would kill things too quickly. If I then created it as a n Actor source that just didn't do anything, I couldn't find a decent way to close it. There's gotta be a better way to do this, but I'm too much of a noob to see it. Can anyone improve this code for me? Thanks, Derek -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this
[akka-user] [akka-streams] Consuming on TCP server side - is this really the best way?
Hi, I'm still trying to figure out the best way to work with TCP flows and, while I've got something working, this seems really quite wrong, so there's gotta be a better way. What I want to do is send an Iterable[Int] from the client to the server and have the server materialize that resulting flow in a Future[Iterable[Int]]. val bytesStage = // elided... BidiFlow of serialization and framing val serverValuePromise = Promise[Seq[AnyRef]]() // Technically, the materialized value isn't important, since it's actually going to be pulled out // via the Promise val serverConsumerFlow: Flow[AnyRef, AnyRef, Future[Seq[AnyRef]]] = Flow. wrap( // Consume the client's stream and complete the serverValuePromise with its folded result Sink.fold(Vector.empty[AnyRef])((acc, v: AnyRef) = acc :+ v). mapMaterializedValue(v = { serverValuePromise.completeWith(v); v }), // We're not sending anything from this side Source.empty)(Keep.left) // The server val serverSide: Future[ServerBinding] = StreamTcp().bindAndHandle( serverConsumerFlow.join(bytesStage), 0.0.0.0, 0, halfClose = true) // We really want to stop listening once the client has successfully connected, but this is good // enough serverValuePromise.future.onComplete { case _ = serverSide.onSuccess { case binding = binding.unbind() } } // I need the endpoint where the client needs to connect val destination = Await.result(serverSide, 1.second).localAddress // Get the source running Source((1 to 10).map(new Integer(_))).via(bytesStage.joinMat(StreamTcp(). outgoingConnection(destination))(Keep.right)).to(Sink.ignore).run() // Print out what the client has sent to the server Await.result(serverValuePromise.future, 1.second).foreach(t = println(stt: $t)) I tried doing this the other way around - where the server side supplies source - but this caused me issues with actually shutting down the socket. Having the client do it seems to make shutting down the socket on completion of the source, just naturally occur. The problem with the server side providing the source was that the client source needed to finish properly. If I created it as `empty` then it would kill things too quickly. If I then created it as a n Actor source that just didn't do anything, I couldn't find a decent way to close it. There's gotta be a better way to do this, but I'm too much of a noob to see it. Can anyone improve this code for me? Thanks, Derek -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.