Yes, using the SinkTaskContext as a notification channel works as well, so thats fine.
While a config value might be useful, its probably not safe to assume that a sink would always want the same number of msgs/records for each commit, since the commit volume might be defined in bytes, e.g., accumulating enough data to fill a 128 MB HDFS chunk could be any number of msgs/records. In fact, this mechanism is really more general than just volume based commits; its really about providing sinks a flexible commit capability (e.g., some sink event requires premature commit, or otherwise requires modification of the commit interval). On Tue, Sep 13, 2016 at 11:37 AM, Shikhar Bhushan <shik...@confluent.io> wrote: > Hi Dean, > > I agree, it would be good to support volume-based offset commits. > > For giving more control on flushes to a sink connector, rather than adding > a new task.put() variant, I think it may be better to add an API like > "requestFlush()" to the `SinkTaskContext` (and perhaps also > `SinkTaskContext`). > > Another option could be to add something like "offset.flush.record.count" > in addition to the existing "offset.flush.interval.ms". Both options could > be configured but whichever happens first would reset the other. > > What do you think? > > Best, > > Shikhar > > On Fri, Sep 9, 2016 at 9:55 AM Dean Arnold <renodino...@gmail.com> wrote: > > > I have a need for volume based commits in a few sink connectors, and the > > current interval-only based commit strategy creates some headaches. After > > skimming the code, it appears that an alternate put() method that > returned > > a Map<TopicPartition, Long> might be used to allow a sink connector to > keep > > Kafka up to date wrt committed offsets in the sink system, so that Kafka > > might defer or reset its commit interval for topics/partitions (at least, > > for the consumer used for SinkTasks). It wouldn't replace interval based > > flush(), but hopefully flush() would be invoked much less frequently, and > > permit the flush interval to be increased, so the sink connector can > better > > optimize its commit batches. Eg, the sink could almost always commit 5000 > > records, rather than whatever happened to be buffered up when flush() was > > called, which might be very small or very large. > > > > I'm thinking of something like > > > > Map<TopicPartition, Long> putAndReport(Collection<SinkRecord> record) > > > > Put the records in the sink, and, if the operation results in committing > > records to the sink system, report the largest committed offset of each > > committed topic/partition. Returns null if no commit occurs. > > > > I'm not certain how a rebalance might effect the processing; since a sink > > would still need to support existing interval based commits, the > rebalance > > (and any sink recovery) would presumably work the same. > > > > Am I overlooking any systemic issues that would preclude such a feature ? > > >