Hi Li Peng,

I think what you're trying to do won't work. The problem is that you have
two TCP clients (sink and source) which are supposed to connect to each
other. Without a server which buffers the incoming data and forwards it to
the outgoing connections, it won't be possible to read the previously
written data.

The exception you're observing originates from the fact that the source
tries to connect to the TCP port 9000 which is not open (since there is no
server listening on this port). The same would happen to the sink.

Cheers,
Till

On Tue, Jan 31, 2017 at 9:04 PM, Li Peng <li.p...@elementanalytics.com>
wrote:

> Yes I did open a socket with netcat. Turns out my first error was due
> to a stream without a sink triggering the socket connect and (I
> thought that without a sink the stream wouldn't affect anything so I
> didn't comment it out, and I didn't open the socket for that port).
> However
>
> I did play with it some more and I think the real issue is that I'm
> trying to have two streams, one write to a port and another read from
> the same port. i.e.
>
> val y = executionEnvironment.socketTextStream("localhost", 9000)
> x.writeToSocket("localhost", 9000, new SimpleStringSchema())
>
> Once I tested just write or just the read it worked, but combined I
> get this error:
>
> java.net.SocketException: Connection reset
> at java.net.SocketInputStream.read(SocketInputStream.java:210)
> at java.net.SocketInputStream.read(SocketInputStream.java:141)
> at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
> at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
> at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
> at java.io.InputStreamReader.read(InputStreamReader.java:184)
> at java.io.BufferedReader.read1(BufferedReader.java:210)
> at java.io.BufferedReader.read(BufferedReader.java:286)
> at java.io.Reader.read(Reader.java:140)
> at org.apache.flink.streaming.api.functions.source.
> SocketTextStreamFunction.run(SocketTextStreamFunction.java:101)
> at org.apache.flink.streaming.api.operators.StreamSource.
> run(StreamSource.java:80)
> at org.apache.flink.streaming.api.operators.StreamSource.
> run(StreamSource.java:53)
> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(
> SourceStreamTask.java:56)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:267)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
> at java.lang.Thread.run(Thread.java:745)
>
> Is this operation not allowed?
>
> And I'm mainly writing to the same socket in order to pass work back
> and forth between streams.
>

Reply via email to