Re: Flink Stream: How to ship results through socket server

2016-01-19 Thread Matthias J. Sax
Your "SocketWriter-Thread" code will run on your client. All code in
"main" runs on the client.

execute() itself runs on the client, too. Of course, it triggers the job
submission to the cluster. In this step, the assembled job from the
previous calls is translated into the JobGraph which is submitted to the
JobManager for execution.

You should start your SocketWriter-Thread manually on the cluster, ie,
if you use "localhost" in "env.socketTextStream", it must be the
TaskManager machine that executes this SocketStream-source task.

I guess, it would be better not to use "localhost", but start your
SocketWriter-Thread on a dedicated machine in the cluster, and connect
your SocketStream-source to this machine via its host name.

-Matthias



On 01/19/2016 03:57 PM, Saiph Kappa wrote:
> Hi,
> 
> This is a simple example that I found using Flink Stream. I changed it
> so the flink client can be executed on a remote cluster, and so that it
> can open a socket server to ship its results for any other consumer
> machine. It seems to me that the socket server is not being open in the
> remote cluster, but rather in my local machine (which I'm using to
> launch the app). How can I achieve that? I want to be able to ship
> results directly from the remote cluster, and through a socket server
> where clients can use as a tap.
> 
> Sorry about indentation:
> 
> |def main(args: Array[String]) { |
> 
> val env =
> StreamExecutionEnvironment.createRemoteEnvironment("myhostname",
> DefaultFlinkMasterPort,
> 
> ||"myapp-assembly-0.1-SNAPSHOT.jar"); | //Read from a socket stream at
> map it to StockPrice objects val socketStockStream =
> env.socketTextStream("localhost", ).map(x => { val split =
> x.split(",") StockPrice(split(0), split(1).toDouble) }) //Generate other
> stock streams val SPX_Stream = env.addSource(generateStock("SPX")(10) _)
> val FTSE_Stream = env.addSource(generateStock("FTSE")(20) _) val
> DJI_Stream = env.addSource(generateStock("DJI")(30) _) val BUX_Stream =
> env.addSource(generateStock("BUX")(40) _) //Merge all stock streams
> together val stockStream = socketStockStream.merge(SPX_Stream,
> FTSE_Stream, DJI_Stream, BUX_Stream) stockStream.print()
> |
> 
> // WHERE IS THE FOLLOWING CODE RUN?
> 
> |var out: PrintWriter = null
> new Thread {
> override def run(): Unit = {
> val serverSocket = new ServerSocket(12345)
> while (true) {
> val socket = serverSocket.accept()
> val hostname = socket.getInetAddress.getHostName.split('.').head
> println(s"Got a new connection from $hostname")
> out = new PrintWriter(socket.getOutputStream)
> }
> }
> }.start()
> 
> |||stockStream|.addSink(record => {
> if(out != null) {
> out.write(record)
> out.flush()
> }
> })
> 
> env.execute("Stock stream") }|
> 
> Thanks.



signature.asc
Description: OpenPGP digital signature


Re: Flink Stream: How to ship results through socket server

2016-01-19 Thread Saiph Kappa
Thanks for your reply Mattias. So it is not possible to open a socket
server in the JobGraph and having it open during the lifetime of the job,
is that what you are saying? And it is required to have an external process
to open that socket server.

On Tue, Jan 19, 2016 at 5:38 PM, Matthias J. Sax  wrote:

> Your "SocketWriter-Thread" code will run on your client. All code in
> "main" runs on the client.
>
> execute() itself runs on the client, too. Of course, it triggers the job
> submission to the cluster. In this step, the assembled job from the
> previous calls is translated into the JobGraph which is submitted to the
> JobManager for execution.
>
> You should start your SocketWriter-Thread manually on the cluster, ie,
> if you use "localhost" in "env.socketTextStream", it must be the
> TaskManager machine that executes this SocketStream-source task.
>
> I guess, it would be better not to use "localhost", but start your
> SocketWriter-Thread on a dedicated machine in the cluster, and connect
> your SocketStream-source to this machine via its host name.
>
> -Matthias
>
>
>
> On 01/19/2016 03:57 PM, Saiph Kappa wrote:
> > Hi,
> >
> > This is a simple example that I found using Flink Stream. I changed it
> > so the flink client can be executed on a remote cluster, and so that it
> > can open a socket server to ship its results for any other consumer
> > machine. It seems to me that the socket server is not being open in the
> > remote cluster, but rather in my local machine (which I'm using to
> > launch the app). How can I achieve that? I want to be able to ship
> > results directly from the remote cluster, and through a socket server
> > where clients can use as a tap.
> >
> > Sorry about indentation:
> >
> > |def main(args: Array[String]) { |
> >
> > val env =
> > StreamExecutionEnvironment.createRemoteEnvironment("myhostname",
> > DefaultFlinkMasterPort,
> >
> > ||"myapp-assembly-0.1-SNAPSHOT.jar"); | //Read from a socket stream at
> > map it to StockPrice objects val socketStockStream =
> > env.socketTextStream("localhost", ).map(x => { val split =
> > x.split(",") StockPrice(split(0), split(1).toDouble) }) //Generate other
> > stock streams val SPX_Stream = env.addSource(generateStock("SPX")(10) _)
> > val FTSE_Stream = env.addSource(generateStock("FTSE")(20) _) val
> > DJI_Stream = env.addSource(generateStock("DJI")(30) _) val BUX_Stream =
> > env.addSource(generateStock("BUX")(40) _) //Merge all stock streams
> > together val stockStream = socketStockStream.merge(SPX_Stream,
> > FTSE_Stream, DJI_Stream, BUX_Stream) stockStream.print()
> > |
> >
> > // WHERE IS THE FOLLOWING CODE RUN?
> >
> > |var out: PrintWriter = null
> > new Thread {
> > override def run(): Unit = {
> > val serverSocket = new ServerSocket(12345)
> > while (true) {
> > val socket = serverSocket.accept()
> > val hostname = socket.getInetAddress.getHostName.split('.').head
> > println(s"Got a new connection from $hostname")
> > out = new PrintWriter(socket.getOutputStream)
> > }
> > }
> > }.start()
> >
> > |||stockStream|.addSink(record => {
> > if(out != null) {
> > out.write(record)
> > out.flush()
> > }
> > })
> >
> > env.execute("Stock stream") }|
> >
> > Thanks.
>
>