Yes, using the SinkTaskContext as a notification channel works as well, so
thats fine.

While a config value might be useful, its probably not safe to assume that
a sink would always want the same number of msgs/records for each commit,
since the commit volume might be defined in bytes, e.g., accumulating
enough data to fill a 128 MB HDFS chunk could be any number of msgs/records.

In fact, this mechanism is really more general than just volume based
commits; its really about providing sinks a flexible commit capability
(e.g., some sink event requires premature commit, or otherwise requires
modification of the commit interval).

On Tue, Sep 13, 2016 at 11:37 AM, Shikhar Bhushan <shik...@confluent.io>
wrote:

> Hi Dean,
>
> I agree, it would be good to support volume-based offset commits.
>
> For giving more control on flushes to a sink connector, rather than adding
> a new task.put() variant, I think it may be better to add an API like
> "requestFlush()" to the `SinkTaskContext` (and perhaps also
> `SinkTaskContext`).
>
> Another option could be to add something like "offset.flush.record.count"
> in addition to the existing "offset.flush.interval.ms". Both options could
> be configured but whichever happens first would reset the other.
>
> What do you think?
>
> Best,
>
> Shikhar
>
> On Fri, Sep 9, 2016 at 9:55 AM Dean Arnold <renodino...@gmail.com> wrote:
>
> > I have a need for volume based commits in a few sink connectors, and the
> > current interval-only based commit strategy creates some headaches. After
> > skimming the code, it appears that an alternate put() method that
> returned
> > a Map<TopicPartition, Long> might be used to allow a sink connector to
> keep
> > Kafka up to date wrt committed offsets in the sink system, so that Kafka
> > might defer or reset its commit interval for topics/partitions (at least,
> > for the consumer used for SinkTasks). It wouldn't replace interval based
> > flush(), but hopefully flush() would be invoked much less frequently, and
> > permit the flush interval to be increased, so the sink connector can
> better
> > optimize its commit batches. Eg, the sink could almost always commit 5000
> > records, rather than whatever happened to be buffered up when flush() was
> > called, which might be very small or very large.
> >
> > I'm thinking of something like
> >
> > Map<TopicPartition, Long> putAndReport(Collection<SinkRecord> record)
> >
> > Put the records in the sink, and, if the operation results in committing
> > records to the sink system, report the largest committed offset of each
> > committed topic/partition. Returns null if no commit occurs.
> >
> > I'm not certain how a rebalance might effect the processing; since a sink
> > would still need to support existing interval based commits, the
> rebalance
> > (and any sink recovery) would presumably work the same.
> >
> > Am I overlooking any systemic issues that would preclude such a feature ?
> >
>

Reply via email to