Your "SocketWriter-Thread" code will run on your client. All code in
"main" runs on the client.

execute() itself runs on the client, too. Of course, it triggers the job
submission to the cluster. In this step, the assembled job from the
previous calls is translated into the JobGraph which is submitted to the
JobManager for execution.

You should start your SocketWriter-Thread manually on the cluster, ie,
if you use "localhost" in "env.socketTextStream", it must be the
TaskManager machine that executes this SocketStream-source task.

I guess, it would be better not to use "localhost", but start your
SocketWriter-Thread on a dedicated machine in the cluster, and connect
your SocketStream-source to this machine via its host name.

-Matthias



On 01/19/2016 03:57 PM, Saiph Kappa wrote:
> Hi,
> 
> This is a simple example that I found using Flink Stream. I changed it
> so the flink client can be executed on a remote cluster, and so that it
> can open a socket server to ship its results for any other consumer
> machine. It seems to me that the socket server is not being open in the
> remote cluster, but rather in my local machine (which I'm using to
> launch the app). How can I achieve that? I want to be able to ship
> results directly from the remote cluster, and through a socket server
> where clients can use as a tap.
> 
> Sorry about indentation:
> 
> |def main(args: Array[String]) { |
> 
>     val env =
> StreamExecutionEnvironment.createRemoteEnvironment("myhostname",
> DefaultFlinkMasterPort,
> 
> ||"myapp-assembly-0.1-SNAPSHOT.jar"); | //Read from a socket stream at
> map it to StockPrice objects val socketStockStream =
> env.socketTextStream("localhost", 9999).map(x => { val split =
> x.split(",") StockPrice(split(0), split(1).toDouble) }) //Generate other
> stock streams val SPX_Stream = env.addSource(generateStock("SPX")(10) _)
> val FTSE_Stream = env.addSource(generateStock("FTSE")(20) _) val
> DJI_Stream = env.addSource(generateStock("DJI")(30) _) val BUX_Stream =
> env.addSource(generateStock("BUX")(40) _) //Merge all stock streams
> together val stockStream = socketStockStream.merge(SPX_Stream,
> FTSE_Stream, DJI_Stream, BUX_Stream) stockStream.print()
> |
> 
> // WHERE IS THE FOLLOWING CODE RUN?
> 
> |var out: PrintWriter = null
> new Thread {
> override def run(): Unit = {
> val serverSocket = new ServerSocket(12345)
> while (true) {
> val socket = serverSocket.accept()
> val hostname = socket.getInetAddress.getHostName.split('.').head
> println(s"Got a new connection from $hostname")
> out = new PrintWriter(socket.getOutputStream)
> }
> }
> }.start()
> 
> |||stockStream|.addSink(record => {
> if(out != null) {
> out.write(record)
> out.flush()
> }
> })
> 
> env.execute("Stock stream") }|
> 
> Thanks.

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to