[
https://issues.apache.org/jira/browse/KAFKA-4161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15491296#comment-15491296
]
Shikhar Bhushan commented on KAFKA-4161:
bq. Probably worth clarifying whether we're really talking about just flush
here or offset commit as well. Flush really only exists in order to support
offset commit (from the framework's perspective), but since you mention full
buffers I think you might be getting at a slightly different use case for
connectors.
Sorry I wasn't clear, flushing data & offset commit are currently coupled as
you pointed out. If we want to avoid unnecessary redelivery of records it is
best to commit offsets with the 'most current' knowledge of them, which we
currently have after calling {{flush()}}.
bq. In general, I think it'd actually be even better to just get rid of the
idea of having to flush as a common operation as it hurts throughput to have to
flush entirely to commit offsets (we are flushing the pipeline, which is never
good). Ideally we coudl do what the framework does with source connectors and
just track which data has been successfully delivered and use that for the
majority of offset commits. We'd still need it for cases like shutdown where we
want to make sure all data has been sent, but since the framework controls
delivery of data, maybe its even better just to wait for that data to be
written.
Good points, I agree it would be better to make it so {{flush()}} is not
routine since it can hurt throughput. I think we can deprecate it altogether.
As a proposal:
{noformat}
abstract class SinkTask {
..
// New method
public Map flushedOffsets() { throw new
NotImplementedException(); }
@Deprecated
public void flush(Map offsets) { }
..
}
{noformat}
Then periodic offset committing business would get at the {{flushedOffsets()}},
and if that is not implemented, call {{flush()}} as currently so it can commit
the offset state as of the last {{put()}} call.
I don't think {{flush()}} is needed even at shutdown. Tasks are already being
advised via {{close()}} and can choose to flush any buffered data from there.
We can do a final offset commit based on the {{flushedOffsets()}} after
{{close()}} (though this does imply a quirk that even after a
{{TopicPartition}} is closed we expect tasks to keep offset state around in the
map returned by {{flushedOffsets()}}).
Additionally, it would be good to have a {{context.requestCommit()}} in the
spirit of {{context.requestFlush()}} as I was originally proposing. The
motivation is that connectors can optimize for avoiding unnecessary redelivery
when recovering from failures. Connectors can choose whatever policies are best
like number-of-records or size-based batching/buffering for writing to the
destination system as part of the normal flow of calls to {{put()}}, and
request a commit when they have actually written data to the destination
system. There need not be a strong guarantee about whether offset committing
actually happens after such a request so we don't commit offsets too often and
can choose to only do it after some minimum interval, e.g. in case a connector
always requests commit after a put.
bq. The main reason I think we even need the explicit flush() is that some
connectors may have very long delays between flushes (e.g. any object stores
like S3) such that they need to be told directly that they need to write all
their data (or discard it).
I don't believe it is currently possible for a connector to communicate that it
wants to discard data rather than write it out when {{flush()}} is called
(aside from I guess throwing an exception...). With the above proposal the
decision of when and whether or not to write data would be completely upto
connectors.
bq. Was there a specific connector & scenario you were thinking about here?
This came up in a thread on the user list ('Sink Connector feature request:
SinkTask.putAndReport()')
> Allow connectors to request flush via the context
> -
>
> Key: KAFKA-4161
> URL: https://issues.apache.org/jira/browse/KAFKA-4161
> Project: Kafka
> Issue Type: New Feature
> Components: KafkaConnect
>Reporter: Shikhar Bhushan
>Assignee: Ewen Cheslack-Postava
> Labels: needs-kip
>
> It is desirable to have, in addition to the time-based flush interval, volume
> or size-based commits. E.g. a sink connector which is buffering in terms of
> number of records may want to request a flush when the buffer is full, or
> when sufficient amount of data has been buffered in a file.
> Having a method like say {{requestFlush()}} on the {{SinkTaskContext}} would
> allow for connectors to have flexible policies around flushes. This would be
> in addition to the time interval based flushes