Hello, I am trying to separate the logic of my application by generating and processing data in different physical engines.
I have created my custom socket source class: class SocketSourceFunction extends SourceFunction[Event2]{ @volatile private var isRunning:Boolean = true; @transient private var serverSocket: ServerSocketChannel = null; override def run(ctx: SourceContext[Event2]) = { val hostname = "localhost" val port = 6667 println("listening:" + port) val server = ServerSocketChannel.open(); server.bind(new InetSocketAddress (hostname, port)); var buffer = ByteBuffer.allocate (68); val des = new EventDeSerializer2() while (isRunning) { println("waiting...") var socketChannel = server.accept(); if (socketChannel != null){ println("accept:" + socketChannel) while (true) { var bytes = 0; bytes = socketChannel.read(buffer) if( bytes > 0) { if (!buffer.hasRemaining()) { buffer.rewind() var event: Event2 = des.deserialize(buffer.array()) ctx.collect(event) buffer.clear() } } } } } } override def cancel() = { isRunning = false; val socket = this.serverSocket; if (socket != null) { try { socket.close(); }catch { case e: Throwable => { System.err.println(String.format("error: %s", e.getMessage())); e.printStackTrace(); System.exit(1); } } } } } I am sending data with either raw sockets using ByteBuffers or with a Flink generator (serializing my Events and using writeToSocket() method). However, in both cases, I am experiencing less than 10x throughput in comparison to in-memory generation, even when using a 10gbit connection (the throughput is much lower). Is there any obvious defect in my implementation? Thank you in advance, George