Hello,

I am trying to separate the logic of my application by generating and
processing data in different physical engines.

I have created my custom socket source class:

class SocketSourceFunction extends SourceFunction[Event2]{
      @volatile private var isRunning:Boolean = true;
      @transient private var serverSocket: ServerSocketChannel = null;

      override def run(ctx: SourceContext[Event2]) = {
  val hostname = "localhost"
  val port = 6667
          println("listening:" + port)
  val server = ServerSocketChannel.open();
  server.bind(new InetSocketAddress (hostname, port));
  var buffer = ByteBuffer.allocate (68);
  val des = new EventDeSerializer2()

  while (isRunning) {
            println("waiting...")
            var socketChannel = server.accept();

     if (socketChannel != null){
               println("accept:" + socketChannel)
               while (true) {
    var bytes = 0;
    bytes = socketChannel.read(buffer)
    if( bytes > 0) {
    if (!buffer.hasRemaining()) {
    buffer.rewind()
    var event: Event2 = des.deserialize(buffer.array())
    ctx.collect(event)
    buffer.clear()
    }
    }
                     }
}
          }
      }

      override def cancel() = {
        isRunning = false;
        val socket = this.serverSocket;
        if (socket != null) {
          try {
            socket.close();
           }catch { case e: Throwable => {
              System.err.println(String.format("error: %s",
e.getMessage()));
        e.printStackTrace();
        System.exit(1);
             }
           }
         }
      }
}

I am sending data with either raw sockets using ByteBuffers or with a Flink
generator (serializing my Events and using writeToSocket() method).
However, in both cases, I am experiencing less than 10x throughput in
comparison to in-memory generation, even when using a 10gbit connection
(the throughput is much lower).

Is there any obvious defect in my implementation?

Thank you in advance,
George

Reply via email to