Hi! (1) RichSinkFunction is the best function for streaming sinks.
(2) The "invoke()" method is never called by multiple threads concurrently. No need to synchronize. Stephan On Thu, Mar 2, 2017 at 4:53 PM, Hussein Baghdadi < hussein.baghd...@zalando.de> wrote: > Hello, > > I have some basic questions regarding Sinks in Flink. > > 1) To implement our own Sink, which class to implement: RichSinkFunction, > RichOutputFormat, etc .. > > 2) We are trying to write batches in our Sink. For that, in overrided > invoke() , we are calling a synchronised function: > > // events = new ConcurrentLinkedQueue<>(); > > @Override > public void invoke(T value) throws Exception { > this.addEventList(value); > } > > private synchronized void addEventList(T event) { > events.add(event); > if ((new Date()).getTime() >= this.nextFlush.get() || events.size() > > this.maxBatchSize) { > Response response = null; > try { > response = nakadiClient.resources().events().send(eventName, > events); > } catch (final ClientException | IllegalStateException e) { > logger.error("Error while sending to Nakadi. Error: {}", > e.getMessage()); > throw new RuntimeException(e); > } finally { > if (response != null) { > try { > response.responseBody().close(); > events = new ConcurrentLinkedQueue<>(); > this.nextFlush.set((new Date()).getTime() + > this.millisecondsWait); > } catch (final Exception ex) { > logger.error("Error happened while trying to close > response. {}", ex); > } > } > } > } > }