[jira] [Commented] (KAFKA-4161) Allow connectors to request flush via the context

2016-09-16 Thread Shikhar Bhushan (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15496820#comment-15496820
 ] 

Shikhar Bhushan commented on KAFKA-4161:


We could also implement KAFKA-3462 here by having the semantics that connectors 
that want to disable offset tracking by Connect can return an empty map from 
{{flushedOffsets()}}. Maybe {{flushedOffsets()}} isn't the best name - really 
want a name implying {{commitableOffsets()}}.

> Allow connectors to request flush via the context
> -
>
> Key: KAFKA-4161
> URL: https://issues.apache.org/jira/browse/KAFKA-4161
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Shikhar Bhushan
>Assignee: Ewen Cheslack-Postava
>  Labels: needs-kip
>
> It is desirable to have, in addition to the time-based flush interval, volume 
> or size-based commits. E.g. a sink connector which is buffering in terms of 
> number of records may want to request a flush when the buffer is full, or 
> when sufficient amount of data has been buffered in a file.
> Having a method like say {{requestFlush()}} on the {{SinkTaskContext}} would 
> allow for connectors to have flexible policies around flushes. This would be 
> in addition to the time interval based flushes that are controlled with 
> {{offset.flush.interval.ms}}, for which the clock should be reset when any 
> kind of flush happens.
> We should probably also support requesting flushes via the 
> {{SourceTaskContext}} for consistency though a use-case doesn't come to mind 
> off the bat.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4161) Allow connectors to request flush via the context

2016-09-14 Thread Shikhar Bhushan (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15491296#comment-15491296
 ] 

Shikhar Bhushan commented on KAFKA-4161:


bq. Probably worth clarifying whether we're really talking about just flush 
here or offset commit as well. Flush really only exists in order to support 
offset commit (from the framework's perspective), but since you mention full 
buffers I think you might be getting at a slightly different use case for 
connectors.

Sorry I wasn't clear, flushing data & offset commit are currently coupled as 
you pointed out. If we want to avoid unnecessary redelivery of records it is 
best to commit offsets with the 'most current' knowledge of them, which we 
currently have after calling {{flush()}}.

bq. In general, I think it'd actually be even better to just get rid of the 
idea of having to flush as a common operation as it hurts throughput to have to 
flush entirely to commit offsets (we are flushing the pipeline, which is never 
good). Ideally we coudl do what the framework does with source connectors and 
just track which data has been successfully delivered and use that for the 
majority of offset commits. We'd still need it for cases like shutdown where we 
want to make sure all data has been sent, but since the framework controls 
delivery of data, maybe its even better just to wait for that data to be 
written. 

Good points, I agree it would be better to make it so {{flush()}} is not 
routine since it can hurt throughput. I think we can deprecate it altogether. 
As a proposal:
{noformat}
abstract class SinkTask {
..
 // New method
public Map flushedOffsets() { throw new 
NotImplementedException(); }

@Deprecated
public void flush(Map offsets) { }
..
}
{noformat}

Then periodic offset committing business would get at the {{flushedOffsets()}}, 
and if that is not implemented, call {{flush()}} as currently so it can commit 
the offset state as of the last {{put()}} call.

I don't think {{flush()}} is needed even at shutdown. Tasks are already being 
advised via {{close()}} and can choose to flush any buffered data from there. 
We can do a final offset commit based on the {{flushedOffsets()}} after 
{{close()}} (though this does imply a quirk that even after a 
{{TopicPartition}} is closed we expect tasks to keep offset state around in the 
map returned by {{flushedOffsets()}}).

Additionally, it would be good to have a {{context.requestCommit()}} in the 
spirit of {{context.requestFlush()}} as I was originally proposing. The 
motivation is that connectors can optimize for avoiding unnecessary redelivery 
when recovering from failures. Connectors can choose whatever policies are best 
like number-of-records or size-based batching/buffering for writing to the 
destination system as part of the normal flow of calls to {{put()}}, and 
request a commit when they have actually written data to the destination 
system. There need not be a strong guarantee about whether offset committing 
actually happens after such a request so we don't commit offsets too often and 
can choose to only do it after some minimum interval, e.g. in case a connector 
always requests commit after a put.

bq. The main reason I think we even need the explicit flush() is that some 
connectors may have very long delays between flushes (e.g. any object stores 
like S3) such that they need to be told directly that they need to write all 
their data (or discard it).

I don't believe it is currently possible for a connector to communicate that it 
wants to discard data rather than write it out when {{flush()}} is called 
(aside from I guess throwing an exception...). With the above proposal the 
decision of when and whether or not to write data would be completely upto 
connectors.

bq. Was there a specific connector & scenario you were thinking about here?

This came up in a thread on the user list ('Sink Connector feature request: 
SinkTask.putAndReport()')

> Allow connectors to request flush via the context
> -
>
> Key: KAFKA-4161
> URL: https://issues.apache.org/jira/browse/KAFKA-4161
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Shikhar Bhushan
>Assignee: Ewen Cheslack-Postava
>  Labels: needs-kip
>
> It is desirable to have, in addition to the time-based flush interval, volume 
> or size-based commits. E.g. a sink connector which is buffering in terms of 
> number of records may want to request a flush when the buffer is full, or 
> when sufficient amount of data has been buffered in a file.
> Having a method like say {{requestFlush()}} on the {{SinkTaskContext}} would 
> allow for connectors to have flexible policies around flushes. This would be 
> in addition to the time interval based flushes 

[jira] [Commented] (KAFKA-4161) Allow connectors to request flush via the context

2016-09-13 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15489179#comment-15489179
 ] 

Ewen Cheslack-Postava commented on KAFKA-4161:
--

[~shikhar] Probably worth clarifying whether we're really talking about just 
flush here or offset commit as well. Flush really only exists in order to 
support offset commit (from the framework's perspective), but since you mention 
full buffers I think you might be getting at a slightly different use case for 
connectors.

In general, I think it'd actually be even better to just get rid of the idea of 
having to flush as a common operation as it hurts throughput to have to flush 
entirely to commit offsets (we are flushing the pipeline, which is never good). 
Ideally we coudl do what the framework does with source connectors and just 
track which data has been successfully delivered and use that for the majority 
of offset commits. We'd still need it for cases like shutdown where we want to 
make sure all data has been sent, but since the framework controls delivery of 
data, maybe its even better just to wait for that data to be written. The main 
reason I think we even need the explicit flush() is that some connectors may 
have *very* long delays between flushes (e.g. any object stores like S3) such 
that they need to be told directly that they need to write all their data (or 
discard it).

Was there a specific connector & scenario you were thinking about here?

> Allow connectors to request flush via the context
> -
>
> Key: KAFKA-4161
> URL: https://issues.apache.org/jira/browse/KAFKA-4161
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Shikhar Bhushan
>Assignee: Ewen Cheslack-Postava
>  Labels: needs-kip
>
> It is desirable to have, in addition to the time-based flush interval, volume 
> or size-based commits. E.g. a sink connector which is buffering in terms of 
> number of records may want to request a flush when the buffer is full, or 
> when sufficient amount of data has been buffered in a file.
> Having a method like say {{requestFlush()}} on the {{SinkTaskContext}} would 
> allow for connectors to have flexible policies around flushes. This would be 
> in addition to the time interval based flushes that are controlled with 
> {{offset.flush.interval.ms}}, for which the clock should be reset when any 
> kind of flush happens.
> We should probably also support requesting flushes via the 
> {{SourceTaskContext}} for consistency though a use-case doesn't come to mind 
> off the bat.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)