Hello Tomoyuki,

1. Seems a good use case for Streams.
2. You should actually set it to Long.MAX_VALUE to indicate "not commit
regularly by intervals", setting to "0" will actually mean to commit every
time. Then you can leverage on ProcessorContext.commit() to manually commit
after the batch is done.


Guozhang




On Wed, Oct 10, 2018 at 11:15 PM, Tomoyuki Saito <aocch...@gmail.com> wrote:

> Hi,
>
> I'm exploring whether it is possible to use Kafka Streams for batch
> processing with at-least-once semantics.
>
> What I want to do is to insert records in an external storage in bulk, and
> execute offset-commit after the bulk insertion to achieve at-least-once
> semantics.
> A processing topology can be very simple like:
> ```
> TopologyBuilder builder = new TopologyBuilder();
> builder.addSource("source", "topic");
> builder.addProcessor("processor", processorSupplier, "source");
> new KafkaStreams(builder, streamsConfig);
> ```
>
> My questions are:
>
> 1. Could you suggest how to achieve that? Can it be better to use
> KafkaConsumer instead of KafkaStreams?
>
> 2. From my understanding, when setting StreamsConfig `commit.interval.ms`
> to 0, we can turn off offset-commit by KafkaStreams internal logic (in
> StreamThread#maybeCommit), and control when to commit offsets with
> `ProcessorContext#commit`. Is my understanding right? Any expected issues
> for this approach?
>
> Thank you,
> Tomoyuki
>



-- 
-- Guozhang

Reply via email to