I solved the problem by having the client send an initial message to the 
server automatically on start. 
It seems that either the server or the client needs to send an initial 
message to get the connection initialized correctly.

In order to quit the application when the user types 'q', I used the 
watchTermination() method to shut down the ActorSystem when the flow 
completes:

    def quit(f: Future[Done]): Unit = {
      f.onComplete {
        case Success(_) =>
          system.terminate()
        case Failure(ex) =>
          println(s"Error : $ex")
          system.terminate()
      }
    }


    val replParser = Flow[String]
      // Need to send an initial message to start off: (NO = No op)
      .merge(Source.single("NO"))
      // Type 'q' to quit
      .takeWhile(_ != "q")
      .watchTermination() { (_, f) => quit(f) }
      .map(elem => ByteString(s"$elem\r"))




On Sunday, September 17, 2017 at 12:45:00 AM UTC+2, Allan Brighton wrote:
>
> In the Akka docs 
> <http://doc.akka.io/docs/akka/2.5/scala/stream/stream-io.html>, in the 
> section titled "Working with streaming IO", there is an example TCP server:
>
> connections.runForeach { connection =>
>
>
>   // server logic, parses incoming commands
>   val commandParser = Flow[String].takeWhile(_ != "BYE").map(_ + "!")
>
>
>   import connection._
>   val welcomeMsg = s"Welcome to: $localAddress, you are: $remoteAddress!"
>   val welcome = Source.single(welcomeMsg)
>
>
>   val serverLogic = Flow[ByteString]
>     .via(Framing.delimiter(
>       ByteString("\n"),
>       maximumFrameLength = 256,
>       allowTruncation = true))
>     .map(_.utf8String)
>     .via(commandParser)
>     // merge in the initial banner after parser
>     .merge(welcome)
>     .map(_ + "\n")
>     .map(ByteString(_))
>
>
>   connection.handleWith(serverLogic)
> }
>
>
> and REPL client:
>
> val connection = Tcp().outgoingConnection("127.0.0.1", 8888)
>
>
> val replParser =
>   Flow[String].takeWhile(_ != "q")
>     .concat(Source.single("BYE"))
>     .map(elem => ByteString(s"$elem\n"))
>
>
> val repl = Flow[ByteString]
>   .via(Framing.delimiter(
>     ByteString("\n"),
>     maximumFrameLength = 256,
>     allowTruncation = true))
>   .map(_.utf8String)
>   .map(text => println("Server: " + text))
>   .map(_ => readLine("> "))
>   .via(replParser)
>
>
> connection.join(repl).run()
>
>
> In my case, the server does not send the welcome message at the start, 
> which is needed to get the flow started.
> I thought I could work around this in the client by inserting this as the 
> first line in the flow:
>
> .merge(Source.single(ByteString("Welcome\r\n:")))
>
> This sort of worked, but now it is not possible to have two clients 
> running at once, unless they each send a message to the server right away 
> on starting.
> (Otherwise I get the error: 
>
> [INFO] [09/17/2017 00:25:07.490] [default-akka.actor.default-dispatcher-2] 
> [akka://default/system/IO-TCP/selectors/$a/0] Message 
> [akka.io.Tcp$Register] from 
> Actor[akka://default/user/StreamSupervisor-0/$$a#-1654072721] to 
> Actor[akka://default/system/IO-TCP/selectors/$a/0#1152954885] was not 
> delivered. [1] dead letters encountered. This logging can be turned off or 
> adjusted with configuration settings 'akka.log-dead-letters' and 
> 'akka.log-dead-letters-during-shutdown'.
> [INFO] [09/17/2017 00:25:07.493] [default-akka.actor.default-dispatcher-2] 
> [akka://default/system/IO-TCP/selectors/$a/0] Message [akka.io.Tcp$Write] 
> from Actor[akka://default/user/StreamSupervisor-0/$$a#-1654072721] to 
> Actor[akka://default/system/IO-TCP/selectors/$a/0#1152954885] was not 
> delivered. [2] dead letters encountered. This logging can be turned off or 
> adjusted with configuration settings 'akka.log-dead-letters' and 
> 'akka.log-dead-letters-during-shutdown'.
>
>
> I assume this is because there was no initial message from the server to 
> the client, which causes the connection to be initialized.
>
> What is the correct way to handle this situation? 
>
> I also noticed that when you type 'q' in the client REPL, the process does 
> not exit. What is missing there?
>
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to