Re: [akka-user] Akka Stream Tcp Messaging Server POC
What is happening when you run the code and how is that different from what you expect? -- Johan Akka Team On Mon, Aug 7, 2017 at 2:16 PM, G Jwrote: > I'm new Akka stream, I have to build on poc on Akka stream for instant > messaging. So I need a help to understand what I'm missing here. > > > import java.net.InetSocketAddress; > import java.util.concurrent.CompletionStage; > > import akka.Done; > import akka.actor.ActorRef; > import akka.actor.Props; > import akka.actor.UntypedActor; > import akka.stream.javadsl.Sink; > import akka.stream.javadsl.Tcp; > > /** > * Created by gaurav on 28/7/17. > */ > public class Server { >public static void main(String[] args) { > InetSocketAddress serverAddress = new InetSocketAddress("127.0.0.1", > 6000); > final Sink handler = > Sink.foreach(conn -> { > System.out.println("Client connected from: " + conn.remoteAddress()); > conn.handleWith(ActorFlow.actorRef(out -> > MyWebSocketActor.props(out)), InternalWebSocketHelper.actorMaterializer()); > }); > final CompletionStage bindingFuture = > Tcp.get(InternalWebSocketHelper.actorSystem()) > .bind(serverAddress.getHostString(), > serverAddress.getPort()).to(handler).run(InternalWebSocketHelper.actorMaterializer()); > > bindingFuture.whenComplete((binding, throwable) -> { > System.out.println("Server started, listening on: " + > binding.localAddress()); > }); > > bindingFuture.exceptionally(e -> { > System.err.println("Server could not bind to " + serverAddress + " : > " + e.getMessage()); > InternalWebSocketHelper.actorSystem().terminate(); > return null; > }); > >} > >private static class MyWebSocketActor extends UntypedActor { > > private final ActorRef out; > > public MyWebSocketActor(ActorRef out) { > this.out = out; > } > > public static Props props(ActorRef out) { > return Props.create(MyWebSocketActor.class, () -> new > MyWebSocketActor(out)); > } > > public void onReceive(Object message) throws Exception { > out.tell(message, ActorRef.noSender()); > } >} > } > > > > > > > > > /** > * Created by gaurav on 1/8/17. > */ > > import java.util.function.Function; > > import org.reactivestreams.Publisher; > > import akka.NotUsed; > import akka.actor.*; > import akka.japi.Pair; > import akka.stream.Materializer; > import akka.stream.OverflowStrategy; > import akka.stream.javadsl.*; > > public class ActorFlow { > >public static Flow actorRef(Function Props> props) { > return actorRef(props, 1000, OverflowStrategy.dropNew(), > InternalWebSocketHelper.actorSystem(), > InternalWebSocketHelper.actorMaterializer()); >} > >public static Flow actorRef(Function Props> props, int bufferSize, OverflowStrategy overflowStrategy, > ActorRefFactory factory, Materializer mat) { > > Pair pair = Source. actorRef(bufferSize, > overflowStrategy) > .toMat(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), > Keep.both()).run(mat); > > return Flow.fromSinkAndSource( > > Sink.actorRef(factory.actorOf(Props.create(WebSocketFlowActor.class, () -> > new WebSocketFlowActor(props, pair.first(, > new Status.Success(new Object())), > Source.fromPublisher(pair.second())); >} > >private static class WebSocketFlowActor extends UntypedActor { > > private final ActorRef flowActor; > > public WebSocketFlowActor(Function props, ActorRef > ref) { > flowActor = context().watch(context().actorOf(props.apply(ref), > "flowActor")); > } > > @Override > public void onReceive(Object message) throws Throwable { > if (message instanceof Status.Success) { > flowActor.tell(PoisonPill.getInstance(), getSelf()); > } else if (message instanceof Terminated) { > context().stop(getSelf()); > } else { > flowActor.tell(message, getSelf()); > } > } > > @Override > public SupervisorStrategy supervisorStrategy() { > return SupervisorStrategy.stoppingStrategy(); > } >} > } > > > > > > > import akka.actor.ActorSystem; > import akka.stream.ActorMaterializer; > > public class InternalWebSocketHelper { > >static ActorSystem actorSystem = ActorSystem.create(); >static ActorMaterializer actorMaterializer = > ActorMaterializer.create(actorSystem); > >static ActorSystem actorSystem() { > return actorSystem; >} > >static ActorMaterializer actorMaterializer() { > return actorMaterializer; >} > } > > -- > >> Read the docs: http://akka.io/docs/ > >> Check the FAQ: http://doc.akka.io/docs/akka/ >
[akka-user] Akka Stream Tcp Messaging Server POC
I'm new Akka stream, I have to build on poc on Akka stream for instant messaging. So I need a help to understand what I'm missing here. import java.net.InetSocketAddress; import java.util.concurrent.CompletionStage; import akka.Done; import akka.actor.ActorRef; import akka.actor.Props; import akka.actor.UntypedActor; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Tcp; /** * Created by gaurav on 28/7/17. */ public class Server { public static void main(String[] args) { InetSocketAddress serverAddress = new InetSocketAddress("127.0.0.1", 6000); final Sinkhandler = Sink.foreach(conn -> { System.out.println("Client connected from: " + conn.remoteAddress()); conn.handleWith(ActorFlow.actorRef(out -> MyWebSocketActor.props(out)), InternalWebSocketHelper.actorMaterializer()); }); final CompletionStage bindingFuture = Tcp.get(InternalWebSocketHelper.actorSystem()) .bind(serverAddress.getHostString(), serverAddress.getPort()).to(handler).run(InternalWebSocketHelper.actorMaterializer()); bindingFuture.whenComplete((binding, throwable) -> { System.out.println("Server started, listening on: " + binding.localAddress()); }); bindingFuture.exceptionally(e -> { System.err.println("Server could not bind to " + serverAddress + " : " + e.getMessage()); InternalWebSocketHelper.actorSystem().terminate(); return null; }); } private static class MyWebSocketActor extends UntypedActor { private final ActorRef out; public MyWebSocketActor(ActorRef out) { this.out = out; } public static Props props(ActorRef out) { return Props.create(MyWebSocketActor.class, () -> new MyWebSocketActor(out)); } public void onReceive(Object message) throws Exception { out.tell(message, ActorRef.noSender()); } } } /** * Created by gaurav on 1/8/17. */ import java.util.function.Function; import org.reactivestreams.Publisher; import akka.NotUsed; import akka.actor.*; import akka.japi.Pair; import akka.stream.Materializer; import akka.stream.OverflowStrategy; import akka.stream.javadsl.*; public class ActorFlow { public static Flow actorRef(Function props) { return actorRef(props, 1000, OverflowStrategy.dropNew(), InternalWebSocketHelper.actorSystem(), InternalWebSocketHelper.actorMaterializer()); } public static Flow actorRef(Function props, int bufferSize, OverflowStrategy overflowStrategy, ActorRefFactory factory, Materializer mat) { Pair pair = Source. actorRef(bufferSize, overflowStrategy) .toMat(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), Keep.both()).run(mat); return Flow.fromSinkAndSource( Sink.actorRef(factory.actorOf(Props.create(WebSocketFlowActor.class, () -> new WebSocketFlowActor(props, pair.first(, new Status.Success(new Object())), Source.fromPublisher(pair.second())); } private static class WebSocketFlowActor extends UntypedActor { private final ActorRef flowActor; public WebSocketFlowActor(Function props, ActorRef ref) { flowActor = context().watch(context().actorOf(props.apply(ref), "flowActor")); } @Override public void onReceive(Object message) throws Throwable { if (message instanceof Status.Success) { flowActor.tell(PoisonPill.getInstance(), getSelf()); } else if (message instanceof Terminated) { context().stop(getSelf()); } else { flowActor.tell(message, getSelf()); } } @Override public SupervisorStrategy supervisorStrategy() { return SupervisorStrategy.stoppingStrategy(); } } } import akka.actor.ActorSystem; import akka.stream.ActorMaterializer; public class InternalWebSocketHelper { static ActorSystem actorSystem = ActorSystem.create(); static ActorMaterializer actorMaterializer = ActorMaterializer.create(actorSystem); static ActorSystem actorSystem() { return actorSystem; } static ActorMaterializer actorMaterializer() { return actorMaterializer; } } -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at