Taking a step back: why do you want to manually implement communication via 
sockets in the first place? With this you will not get any fault-tolerance 
guarantees and I would guess that maintaining a custom solution is more 
difficult than using, say, Kafka.

Best,
Aljoscha

> On 16. Jan 2018, at 13:24, Nico Kruber <n...@data-artisans.com> wrote:
> 
> (back to the ml again)
> 
> If you implement the ParallelSourceFunction interface instead, Flink
> will run as many source instances as the configured parallelism. Each
> instance will run the same code and you'll thus have multiple sockets to
> connect to, if that is what you wanted.
> 
> 
> One more thing regarding your source: typically you'd want the
> checkpoint lock around the collect() call, i.e.
> 
> synchronized (ctx.getCheckpointLock()) {
>  ctx.collect(...)
> }
> 
> 
> Nico
> 
> On 16/01/18 12:27, George Theodorakis wrote:
>> Thank you very much, indeed this was my bottleneck. 
>> 
>> My problem now is that my source is not parallel, so when I am
>> increasing parallelism, system's throughput falls.
>> 
>> Is opening multiple sockets a quick solution to make the source parallel?
>> 
>> G.
>> 
>> 2018-01-16 10:51 GMT+00:00 Nico Kruber <n...@data-artisans.com
>> <mailto:n...@data-artisans.com>>:
>> 
>>    Hi George,
>>    I suspect issuing a read operation for every 68 bytes incurs too much
>>    overhead to perform as you would like it to. Instead, create a bigger
>>    buffer (64k?) and extract single events from sub-regions of this buffer
>>    instead.
>>    Please note, however, that then the first buffer will only be processed
>>    when this method returns (the details depend on the underlying channel
>>    [1]). This is a trade-off between latency and throughput at some point.
>>    If you set non-blocking mode for your channels, you will always get what
>>    the channel has available and continue immediately. You can set this up
>>    via this, for example:
>> 
>>    ==========
>>    socketChannel.configureBlocking(false);
>>    socketChannel.connect(new InetSocketAddress("http://jenkov.com
>>    <http://jenkov.com>", 80));
>> 
>>    while(! socketChannel.finishConnect() ){
>>        //wait, or do something else...
>>    }
>>    ==========
>> 
>> 
>>    Nico
>> 
>>    [1]
>>    
>> https://docs.oracle.com/javase/7/docs/api/java/nio/channels/SocketChannel.html#read(java.nio.ByteBuffer)
>>    
>> <https://docs.oracle.com/javase/7/docs/api/java/nio/channels/SocketChannel.html#read(java.nio.ByteBuffer)>
>> 
>>    On 15/01/18 13:19, George Theodorakis wrote:
>>> Hello,
>>> 
>>> I am trying to separate the logic of my application by generating and
>>> processing data in different physical engines. 
>>> 
>>> I have created my custom socket source class:
>>> 
>>> class SocketSourceFunction extends SourceFunction[Event2]{
>>>       @volatile private var isRunning:Boolean = true;
>>>       @transient private var serverSocket: ServerSocketChannel =
>>    null; 
>>> 
>>>       override def run(ctx: SourceContext[Event2]) = {
>>>   val hostname = "localhost"
>>>   val port = 6667
>>>           println("listening:" + port)                
>>>   val server = ServerSocketChannel.open();
>>>   server.bind(new InetSocketAddress (hostname, port));    
>>>   var buffer = ByteBuffer.allocate (68);
>>>   val des = new EventDeSerializer2()
>>>       
>>>   while (isRunning) {
>>>             println("waiting...")            
>>>             var socketChannel = server.accept();
>>> 
>>>      if (socketChannel != null){
>>>                println("accept:" + socketChannel)
>>>                while (true) {
>>>     var bytes = 0;
>>>     bytes = socketChannel.read(buffer)
>>>     if( bytes > 0) {
>>>     if (!buffer.hasRemaining()) {
>>>     buffer.rewind()
>>>     var event: Event2 = des.deserialize(buffer.array())
>>>     ctx.collect(event)
>>>     buffer.clear()
>>>     }
>>>     }
>>>                      }
>>> }
>>>           }          
>>>       }
>>> 
>>>       override def cancel() = {
>>>         isRunning = false;
>>>         val socket = this.serverSocket; 
>>>         if (socket != null) { 
>>>           try { 
>>>             socket.close(); 
>>>            }catch { case e: Throwable => {  
>>>              System.err.println(String.format("error: %s",
>>    e.getMessage()));
>>>         e.printStackTrace();
>>>         System.exit(1);
>>>              }
>>>            }
>>>          } 
>>>       }
>>> }
>>> 
>>> I am sending data with either raw sockets using ByteBuffers or with a
>>> Flink generator (serializing my Events and using writeToSocket()
>>> method). However, in both cases, I am experiencing less than 10x
>>> throughput in comparison to in-memory generation, even when using
>>> a 10gbit connection (the throughput is much lower).
>>> 
>>> Is there any obvious defect in my implementation?
>>> 
>>> Thank you in advance,
>>> George
>> 
>> 
> 

Reply via email to