Taking a step back: why do you want to manually implement communication via sockets in the first place? With this you will not get any fault-tolerance guarantees and I would guess that maintaining a custom solution is more difficult than using, say, Kafka.
Best, Aljoscha > On 16. Jan 2018, at 13:24, Nico Kruber <n...@data-artisans.com> wrote: > > (back to the ml again) > > If you implement the ParallelSourceFunction interface instead, Flink > will run as many source instances as the configured parallelism. Each > instance will run the same code and you'll thus have multiple sockets to > connect to, if that is what you wanted. > > > One more thing regarding your source: typically you'd want the > checkpoint lock around the collect() call, i.e. > > synchronized (ctx.getCheckpointLock()) { > ctx.collect(...) > } > > > Nico > > On 16/01/18 12:27, George Theodorakis wrote: >> Thank you very much, indeed this was my bottleneck. >> >> My problem now is that my source is not parallel, so when I am >> increasing parallelism, system's throughput falls. >> >> Is opening multiple sockets a quick solution to make the source parallel? >> >> G. >> >> 2018-01-16 10:51 GMT+00:00 Nico Kruber <n...@data-artisans.com >> <mailto:n...@data-artisans.com>>: >> >> Hi George, >> I suspect issuing a read operation for every 68 bytes incurs too much >> overhead to perform as you would like it to. Instead, create a bigger >> buffer (64k?) and extract single events from sub-regions of this buffer >> instead. >> Please note, however, that then the first buffer will only be processed >> when this method returns (the details depend on the underlying channel >> [1]). This is a trade-off between latency and throughput at some point. >> If you set non-blocking mode for your channels, you will always get what >> the channel has available and continue immediately. You can set this up >> via this, for example: >> >> ========== >> socketChannel.configureBlocking(false); >> socketChannel.connect(new InetSocketAddress("http://jenkov.com >> <http://jenkov.com>", 80)); >> >> while(! socketChannel.finishConnect() ){ >> //wait, or do something else... >> } >> ========== >> >> >> Nico >> >> [1] >> >> https://docs.oracle.com/javase/7/docs/api/java/nio/channels/SocketChannel.html#read(java.nio.ByteBuffer) >> >> <https://docs.oracle.com/javase/7/docs/api/java/nio/channels/SocketChannel.html#read(java.nio.ByteBuffer)> >> >> On 15/01/18 13:19, George Theodorakis wrote: >>> Hello, >>> >>> I am trying to separate the logic of my application by generating and >>> processing data in different physical engines. >>> >>> I have created my custom socket source class: >>> >>> class SocketSourceFunction extends SourceFunction[Event2]{ >>> @volatile private var isRunning:Boolean = true; >>> @transient private var serverSocket: ServerSocketChannel = >> null; >>> >>> override def run(ctx: SourceContext[Event2]) = { >>> val hostname = "localhost" >>> val port = 6667 >>> println("listening:" + port) >>> val server = ServerSocketChannel.open(); >>> server.bind(new InetSocketAddress (hostname, port)); >>> var buffer = ByteBuffer.allocate (68); >>> val des = new EventDeSerializer2() >>> >>> while (isRunning) { >>> println("waiting...") >>> var socketChannel = server.accept(); >>> >>> if (socketChannel != null){ >>> println("accept:" + socketChannel) >>> while (true) { >>> var bytes = 0; >>> bytes = socketChannel.read(buffer) >>> if( bytes > 0) { >>> if (!buffer.hasRemaining()) { >>> buffer.rewind() >>> var event: Event2 = des.deserialize(buffer.array()) >>> ctx.collect(event) >>> buffer.clear() >>> } >>> } >>> } >>> } >>> } >>> } >>> >>> override def cancel() = { >>> isRunning = false; >>> val socket = this.serverSocket; >>> if (socket != null) { >>> try { >>> socket.close(); >>> }catch { case e: Throwable => { >>> System.err.println(String.format("error: %s", >> e.getMessage())); >>> e.printStackTrace(); >>> System.exit(1); >>> } >>> } >>> } >>> } >>> } >>> >>> I am sending data with either raw sockets using ByteBuffers or with a >>> Flink generator (serializing my Events and using writeToSocket() >>> method). However, in both cases, I am experiencing less than 10x >>> throughput in comparison to in-memory generation, even when using >>> a 10gbit connection (the throughput is much lower). >>> >>> Is there any obvious defect in my implementation? >>> >>> Thank you in advance, >>> George >> >> >