Hi!

(1) RichSinkFunction is the best function for streaming sinks.

(2) The "invoke()" method is never called by multiple threads concurrently.
No need to synchronize.

Stephan


On Thu, Mar 2, 2017 at 4:53 PM, Hussein Baghdadi <
hussein.baghd...@zalando.de> wrote:

> Hello,
>
> I have some basic questions regarding Sinks in Flink.
>
> 1) To implement our own Sink, which class to implement: RichSinkFunction,
> RichOutputFormat, etc ..
>
> 2) We are trying to write batches in our Sink. For that, in overrided
> invoke() , we are calling a synchronised function:
>
> // events = new ConcurrentLinkedQueue<>();
>
>    @Override
>    public void invoke(T value) throws Exception {
>        this.addEventList(value);
>    }
>
>    private synchronized void addEventList(T event) {
>        events.add(event);
>        if ((new Date()).getTime() >= this.nextFlush.get() || events.size()
> > this.maxBatchSize) {
>            Response response = null;
>            try {
>                response = nakadiClient.resources().events().send(eventName,
> events);
>            } catch (final ClientException | IllegalStateException e) {
>                logger.error("Error while sending to Nakadi. Error: {}",
> e.getMessage());
>                throw new RuntimeException(e);
>            } finally {
>                if (response != null) {
>                    try {
>                        response.responseBody().close();
>                        events = new ConcurrentLinkedQueue<>();
>                        this.nextFlush.set((new Date()).getTime() +
> this.millisecondsWait);
>                    } catch (final Exception ex) {
>                        logger.error("Error happened while trying to close
> response. {}", ex);
>                    }
>                }
>            }
>        }
>    }

Reply via email to