Hello, 

I have some basic questions regarding Sinks in Flink.

1) To implement our own Sink, which class to implement: RichSinkFunction, 
RichOutputFormat, etc ..

2) We are trying to write batches in our Sink. For that, in overrided invoke() 
, we are calling a synchronised function:

// events = new ConcurrentLinkedQueue<>();

   @Override
   public void invoke(T value) throws Exception {
       this.addEventList(value);
   }

   private synchronized void addEventList(T event) {
       events.add(event);
       if ((new Date()).getTime() >= this.nextFlush.get() || events.size() > 
this.maxBatchSize) {
           Response response = null;
           try {
               response = nakadiClient.resources().events().send(eventName, 
events);
           } catch (final ClientException | IllegalStateException e) {
               logger.error("Error while sending to Nakadi. Error: {}", 
e.getMessage());
               throw new RuntimeException(e);
           } finally {
               if (response != null) {
                   try {
                       response.responseBody().close();
                       events = new ConcurrentLinkedQueue<>();
                       this.nextFlush.set((new Date()).getTime() + 
this.millisecondsWait);
                   } catch (final Exception ex) {
                       logger.error("Error happened while trying to close 
response. {}", ex);
                   }
               }
           }
       }
   }

Reply via email to