[jira] [Created] (KAFKA-10153) Documentation for the Errant Record Reporter

2020-06-11 Thread Aakash Shah (Jira)
Aakash Shah created KAFKA-10153:
---

 Summary: Documentation for the Errant Record Reporter
 Key: KAFKA-10153
 URL: https://issues.apache.org/jira/browse/KAFKA-10153
 Project: Kafka
  Issue Type: Task
  Components: documentation
Affects Versions: 2.6.0
Reporter: Aakash Shah
Assignee: Aakash Shah


Add documentation for the new Errant Record Reporter API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10115) Incorporate errors.tolerance with the Errant Record Reporter

2020-06-07 Thread Aakash Shah (Jira)
Aakash Shah created KAFKA-10115:
---

 Summary: Incorporate errors.tolerance with the Errant Record 
Reporter
 Key: KAFKA-10115
 URL: https://issues.apache.org/jira/browse/KAFKA-10115
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 2.6.0
Reporter: Aakash Shah
Assignee: Aakash Shah
 Fix For: 2.6.0


The errors.tolerance config is currently not being used when using the Errant 
Record Reporter. If errors.tolerance is none then the task should fail rather 
than sending it to the DLQ in Kafka.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-19 Thread Aakash Shah
Hello all,

I'd actually like to retract the earlier additions to the KIP and point out
that since I've started the voting process and gotten some responses, I
will not be making any major changes to the KIP as it would require a
re-voting process.

Thanks,
Aakash

On Tue, May 19, 2020 at 2:31 PM Aakash Shah  wrote:

> Hi Chris and others,
>
> Yes, you are correct; I looked through KIP-298 to understand it better. I
> agree with your idea to handle "errors.tolerance=none."
>
> I see, you are basically saying you are in favor of standardizing handling
> what to set the reporter to if it is not configured. I am on board with
> this proposal, especially if this is in line with previous behaviors as you
> mentioned.
>
> I will add both of these suggestions to the KIP.
>
> Lastly, unless anyone has any issues with Chris's suggestions, I believe
> the last part we have to come to a consensus is using a Future as the
> return type. I am for giving extra guarantees to the user if they wish;
> however, I am not very familiar with the potential issues with the consumer
> heartbeat as Arjun pointed out. Does anyone have any thoughts on this?
>
> Thanks,
> Aakash
>
> On Tue, May 19, 2020 at 2:10 PM Chris Egerton  wrote:
>
>> Hi Aakash,
>>
>> > If "errors.tolerance=none", should it not be the case that the error
>> reporter does not even report any error; rather, the task just fails after
>> throwing the error? I do understand the point you are saying about
>> duplicates, though.
>>
>> I believe the "errors.tolerance" property dictates whether a task should
>> fail after a record that causes problems during conversion or
>> transformation is encountered and reported (for example, by writing to a
>> DLQ). If it is set to "none", then the task will fail immediately; if it
>> is
>> set to "all", then the task will continue running normally. So if we want
>> to preserve that behavior, we might want to immediately throw an exception
>> when an errant record is reported by a "SinkTask" instance and the user
>> has
>> configured "errors.tolerance = none", which unless caught will cause the
>> task to cease writing records to the sink. In addition to throwing that
>> exception, we should also still fail the task; the exception is just a way
>> to (hopefully) interrupt the task's processing of records in order to
>> prevent duplicates if/when the task is restarted later on.
>>
>> > Lastly, why do you say we should always provide an errant record
>> reporter?
>> Doesn't that change the contract of what functionality it is providing?
>>
>> I'm just thinking that instead of returning "null" when no errant record
>> reporter is configured, we could return one that always fails the task and
>> throws an exception. This seems in line with the default behavior of the
>> framework when no error handling configuration properties are specified
>> and
>> a record causes problems during conversion or transformation. We could
>> leave the choice in the hands of developers but this might make things
>> confusing for users who get different behavior from different connectors
>> under the same circumstances.
>>
>> Hope this helps!
>>
>> Cheers,
>>
>> Chris
>>
>> On Tue, May 19, 2020 at 1:50 PM Aakash Shah  wrote:
>>
>> > Hi Arjun,
>> >
>> > I am not very familiar with how the potential heartbeat failure would
>> cause
>> > more failures when consuming subsequent records. Can you elaborate on
>> this?
>> >
>> > Thanks,
>> > Aakash
>> >
>> > On Tue, May 19, 2020 at 10:03 AM Arjun Satish 
>> > wrote:
>> >
>> > > One more concern with the connector blocking on the Future's get() is
>> > that
>> > > it may cause the task's consumer to fail to heartbeat (since there is
>> no
>> > > independent thread to do this). That would then cause failures when we
>> > > eventually try to consume more records after returning from put(). The
>> > > developer would need to be cognizant of these bits before waiting on
>> the
>> > > future, which adds a reasonable amount of complexity.
>> > >
>> > > Even with preCommit() returning incomplete offsets, I suppose the
>> concern
>> > > would be that the put() method keeps giving the task more records,
>> and to
>> > > truly pause the "firehose", the task needs to pause all partitions?
>> >

Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-19 Thread Aakash Shah
Hi Chris and others,

Yes, you are correct; I looked through KIP-298 to understand it better. I
agree with your idea to handle "errors.tolerance=none."

I see, you are basically saying you are in favor of standardizing handling
what to set the reporter to if it is not configured. I am on board with
this proposal, especially if this is in line with previous behaviors as you
mentioned.

I will add both of these suggestions to the KIP.

Lastly, unless anyone has any issues with Chris's suggestions, I believe
the last part we have to come to a consensus is using a Future as the
return type. I am for giving extra guarantees to the user if they wish;
however, I am not very familiar with the potential issues with the consumer
heartbeat as Arjun pointed out. Does anyone have any thoughts on this?

Thanks,
Aakash

On Tue, May 19, 2020 at 2:10 PM Chris Egerton  wrote:

> Hi Aakash,
>
> > If "errors.tolerance=none", should it not be the case that the error
> reporter does not even report any error; rather, the task just fails after
> throwing the error? I do understand the point you are saying about
> duplicates, though.
>
> I believe the "errors.tolerance" property dictates whether a task should
> fail after a record that causes problems during conversion or
> transformation is encountered and reported (for example, by writing to a
> DLQ). If it is set to "none", then the task will fail immediately; if it is
> set to "all", then the task will continue running normally. So if we want
> to preserve that behavior, we might want to immediately throw an exception
> when an errant record is reported by a "SinkTask" instance and the user has
> configured "errors.tolerance = none", which unless caught will cause the
> task to cease writing records to the sink. In addition to throwing that
> exception, we should also still fail the task; the exception is just a way
> to (hopefully) interrupt the task's processing of records in order to
> prevent duplicates if/when the task is restarted later on.
>
> > Lastly, why do you say we should always provide an errant record
> reporter?
> Doesn't that change the contract of what functionality it is providing?
>
> I'm just thinking that instead of returning "null" when no errant record
> reporter is configured, we could return one that always fails the task and
> throws an exception. This seems in line with the default behavior of the
> framework when no error handling configuration properties are specified and
> a record causes problems during conversion or transformation. We could
> leave the choice in the hands of developers but this might make things
> confusing for users who get different behavior from different connectors
> under the same circumstances.
>
> Hope this helps!
>
> Cheers,
>
> Chris
>
> On Tue, May 19, 2020 at 1:50 PM Aakash Shah  wrote:
>
> > Hi Arjun,
> >
> > I am not very familiar with how the potential heartbeat failure would
> cause
> > more failures when consuming subsequent records. Can you elaborate on
> this?
> >
> > Thanks,
> > Aakash
> >
> > On Tue, May 19, 2020 at 10:03 AM Arjun Satish 
> > wrote:
> >
> > > One more concern with the connector blocking on the Future's get() is
> > that
> > > it may cause the task's consumer to fail to heartbeat (since there is
> no
> > > independent thread to do this). That would then cause failures when we
> > > eventually try to consume more records after returning from put(). The
> > > developer would need to be cognizant of these bits before waiting on
> the
> > > future, which adds a reasonable amount of complexity.
> > >
> > > Even with preCommit() returning incomplete offsets, I suppose the
> concern
> > > would be that the put() method keeps giving the task more records, and
> to
> > > truly pause the "firehose", the task needs to pause all partitions?
> > >
> > >
> > > On Tue, May 19, 2020 at 9:26 AM Arjun Satish 
> > > wrote:
> > >
> > > > Can we get a couple of examples that shows utility of waiting on the
> > > > Future<>? Also, in preCommit() we would report back on the incomplete
> > > > offsets. So that feedback mechanism will already exists for
> developers
> > > who
> > > > want to manually manage this.
> > > >
> > > > On Tue, May 19, 2020 at 8:03 AM Randall Hauch 
> > wrote:
> > > >
> > > >> Thanks, Aakash, for updating the KIP.
> > > >>
> > > >> On Tue, May 19, 2020 at 2:18 AM Arjun 

Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-19 Thread Aakash Shah
Hi Arjun,

I am not very familiar with how the potential heartbeat failure would cause
more failures when consuming subsequent records. Can you elaborate on this?

Thanks,
Aakash

On Tue, May 19, 2020 at 10:03 AM Arjun Satish 
wrote:

> One more concern with the connector blocking on the Future's get() is that
> it may cause the task's consumer to fail to heartbeat (since there is no
> independent thread to do this). That would then cause failures when we
> eventually try to consume more records after returning from put(). The
> developer would need to be cognizant of these bits before waiting on the
> future, which adds a reasonable amount of complexity.
>
> Even with preCommit() returning incomplete offsets, I suppose the concern
> would be that the put() method keeps giving the task more records, and to
> truly pause the "firehose", the task needs to pause all partitions?
>
>
> On Tue, May 19, 2020 at 9:26 AM Arjun Satish 
> wrote:
>
> > Can we get a couple of examples that shows utility of waiting on the
> > Future<>? Also, in preCommit() we would report back on the incomplete
> > offsets. So that feedback mechanism will already exists for developers
> who
> > want to manually manage this.
> >
> > On Tue, May 19, 2020 at 8:03 AM Randall Hauch  wrote:
> >
> >> Thanks, Aakash, for updating the KIP.
> >>
> >> On Tue, May 19, 2020 at 2:18 AM Arjun Satish 
> >> wrote:
> >>
> >> > Hi Randall,
> >> >
> >> > Thanks for the explanation! Excellent point about guaranteeing offsets
> >> in
> >> > the async case.
> >> >
> >> > If we can guarantee that the offsets will be advanced only after the
> bad
> >> > records are reported, then is there any value is the Future<> return
> >> type?
> >> > I feel we can declare the function with a void return type:
> >> >
> >> > void report(SinkRecord failedRecord, Throwable error)
> >> >
> >> > that works asynchronously, and advances offsets only after the DLQ
> >> producer
> >> > (and other reporters) complete successfully (as you explained).
> >> >
> >> > This actually alleviates my concern of what this Future<> actually
> >> means.
> >> > Since a failure to report should kill the tasks, there is no reason
> for
> >> the
> >> > connector to ever wait on the get().
> >>
> >>
> >> We should not say "there is no reason", because we don't know all of the
> >> requirements that might exist for sending records to external systems.
> The
> >> additional guarantee regarding error records being fully recorded before
> >> `preCommit(...)` is called is a minimal guarantee that Connect provides
> >> the
> >> sink task, and returning a Future allow a sink task to have *stronger*
> >> guarantees than what Connect provides by default.
> >>
> >> Once again:
> >> 1. we need an async API to allow the sink task to report problem records
> >> and then immediately continue doing more work.
> >> 2. Connect should guarantee to the sink task that all reported records
> >> will
> >> actually be recorded before `preCommit(...)` is called
> >> 3. a sink task *might* need stronger guarantees, and may need to block
> on
> >> the reported records some time before `preCommit(...)`, and we should
> >> allow
> >> them to do this.
> >> 4. Future and callbacks are common techniques, but there are significant
> >> runtime risks of using callbacks, whereas Future is a common/standard
> >> pattern that is straightforward to use.
> >>
> >> This *exactly* matches the current KIP, which is why I plan to vote for
> >> this valuable and well-thought out KIP.
> >>
> >>
> >> > And if we are guaranteeing that the
> >> > offsets are only advanced when the errors are reported, then this
> >> becomes a
> >> > double win:
> >> >
> >> > 1. connector developers can literally fire and forget failed records.
> >> > 2. offsets are correctly advanced on errors being reported. Failure to
> >> > report error will kill the task, and the last committed offset will be
> >> the
> >> > correct one.
> >>
> >>
> >> > The main contract would simply be to call report() before preCommit()
> or
> >> > before put() returns in the task, so the framework knows that that
> there
> >> > are error records reported, and those need to finish before the
> offsets
> >> can
> >> > be advanced.
> >> >
> >> > I think I'd be pretty excited about this API. and if we all agree,
> then
> >> > let's go ahead with this?
> >>
> >>
> >> > Best,
> >> >
> >> >
> >> >
> >>
> >
>


Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-19 Thread Aakash Shah
Hi Chris,

Thanks for the suggestions.

If "errors.tolerance=none", should it not be the case that the error
reporter does not even report any error; rather, the task just fails after
throwing the error? I do understand the point you are saying about
duplicates, though.

You raise a good point about "offset.flush.interval.ms" and I think we
should respect that. I will add this constraint to the KIP. Please let me
know if this extra constraint adds any other issues I am not aware of.

Lastly, why do you say we should always provide an errant record reporter?
Doesn't that change the contract of what functionality it is providing?

Thanks,
Aakash


On Tue, May 19, 2020 at 11:15 AM Chris Egerton  wrote:

> Hi Randall,
>
> First off, thank you for the incredibly detailed example. I don't mind
> walls of text. I found it very helpful. I especially liked the idea about
> modifying how the framework invokes "SinkTask::preCommit" to take most of
> the work out of developers' hands in the common case of a "fire-and-forget"
> but still provide flexibility to accommodate connectors with, for example,
> exactly-once delivery guarantees that involve committing offsets to the
> sink atomically with the actual records that they've received from Kafka.
>
> I have one point I'd like to raise about the stated advantage of an
> asynchronous API: that tasks can continue processing records and sending
> them to the sink destination without having to block on the completion of
> the error report.
>
> Wouldn't this actually be a disadvantage in the case that the user has
> configured the connector with "errors.tolerance = none"? In that case, the
> expectation is that the task should fail as soon as it hits a bad record;
> allowing it to possibly continue to produce records in that case (which
> would likely end up as duplicates in the sink if/when the task is
> restarted) doesn't seem optimal.
>
> I don't think that this makes an asynchronous API completely unusable; I
> just think that we'd want to synchronously throw some kind of exception
> when the error reporter is invoked and the connector is configured with
> "errors.tolerance = none", instead of causing one to be thrown wrapped in
> an ExecutionException if/when "Future::get" is called on the returned
> future.
>
> I'd also like to suggest a slight change to the logic for invoking
> "SinkTask::preCommit". The interval at which offsets are committed for sink
> tasks is configurable via the worker-level "offset.flush.interval.ms"
> property; I think it'd be nice to respect that property if we could. What
> would you think about calling "SinkTask::preCommit" at the normally
> scheduled times, but altering the offsets that are passed in to that call
> to not go beyond any offsets for errant records that have been reported but
> not fully processed yet?
>
> For example, imagine a task has been given records with offsets 0-10 on a
> single topic partition and reports records with offsets 2 and 7 to the
> framework. Then, the framework is able to process the record with offset 2
> but not the record with offset 7. When it comes time for an offset commit,
> the framework will call "SinkTask::preCommit" with an offset of 6 for that
> topic partition, since the record for offset 7 has not been completely
> taken care of yet.
>
> One more small suggestion: we may want to always provide an errant record
> reporter to connectors, even if one has not been configured. This reporter
> would simply fail the task and throw an exception as soon as it's invoked.
> This would provide a more uniform experience for users across different
> connectors and would establish expectations that, if a connector uses the
> features added by KIP-610 at all, it will fail by default on any invalid
> records (instead of doing something implementation-dependent).
>
> Cheers,
>
> Chris
>
> On Tue, May 19, 2020 at 10:03 AM Arjun Satish 
> wrote:
>
> > One more concern with the connector blocking on the Future's get() is
> that
> > it may cause the task's consumer to fail to heartbeat (since there is no
> > independent thread to do this). That would then cause failures when we
> > eventually try to consume more records after returning from put(). The
> > developer would need to be cognizant of these bits before waiting on the
> > future, which adds a reasonable amount of complexity.
> >
> > Even with preCommit() returning incomplete offsets, I suppose the concern
> > would be that the put() method keeps giving the task more records, and to
> > truly pause the "firehose", the task needs to pause all partitions?
> >
> >
> > On Tue, May 19, 2020 at 9:26 AM Arjun Satish 
> > wrote:
> >
> > > Can we get a couple of examples that shows utility of waiting on the
> > > Future<>? Also, in preCommit() we would report back on the incomplete
> > > offsets. So that feedback mechanism will already exists for developers
> > who
> > > want to manually manage this.
> > >
> > > On Tue, May 19, 2020 at 8:03 AM Randall Hauch 
> w

Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-18 Thread Aakash Shah
Q
> topic
> c) report record 11 and do NOT block until record 10 is written to the DLQ
> topic
> d) process records 12-102
> e) send batch of 100 records (offsets 1-102, minus 10 and 11) to external
> system
> f) process records 103-202
> g) send batch of 100 records (offsets 203-202) to external system
> ...
> s) send batch of 100 records (offsets 3007-4000) to external system
> t) respond to preCommit(4000) by returning the same offset
>
> Note that steps b, c, and similar steps for the other four problematic
> records do NOT wait until records are written to the DLQ before continuing.
> This means that the first batch of 100 records is sent to the external
> system whether or not the two records have been successfully written to the
> DLQ. The *asynchronous* `report(...)` method allows the sink task to
> continue processing subsequent records without delay, and the net lag is
> lower than the synchronous case. Plus, minor to moderate delays in
> reporting do not necessarily impact the sink task operation.
>
> This is an example of a sink task choosing to not block on the future
> returned from `report(...)`. Of course, sink tasks *can* use the future if
> they desire -- for example, maybe the sink task does block on the future
> before returning from each `put(...)` simply because of the guarantees it
> wants to provide. Maybe the sink task is managing offsets, and it wants to
> write offsets in the external system only after all error records in some
> sequence are fully processed.
>
> The bottom line is that an asynchronous `report(...)` method has
> significant advantages, is still easy to use, and when necessary allows
> sink task implementations to track when the errors have been "fully
> reported", all while not constraining/limiting sink task implementations
> that don't need those guarantees.
>
> However, here's the additional concern I mentioned at the outset of my
> email. Connect should not commit offsets for a topic partition only after
> the error reporter has "fully processed" all submitted records with that or
> earlier offsets. For the sink task developer, this means the framework must
> guarantee this happens before `preCommit(...)` is called. I think we fully
> describe this guarantee by adding the following to the KIP (perhaps in a
> new "Guarantees" section):
>
> "The Connect framework also guarantees that by the time `preCommit(...)` is
> called on the task, the error reporter will have successfully and fully
> recorded all reported records with offsets at or before those passed to the
> preCommit method. Sink task implementations that need more strict
> guarantees can use the futures returned by `report(...)` to wait for
> confirmation that reported records have been successfully recorded."
>
>
> IMO this gives sink task developers pretty clear guidance: sink tasks need
> only worry about the futures returned from `report(...)` if they require
> more strict guarantees that the errors have been fully reported. This still
> allows sink tasks to return different offsets from `preCommit(...)`. And
> any sink connectors that rely upon the framework committing consumer
> offsets based upon when records were fully-processed by the task will
> likely not have to use futures at all.
>
> To put in terms I used at the outset of this (way too long) email: the
> asynchronous API
>
>- is valuable because it allows sink tasks to continue doing work
>without having to wait for the reporter, and that only during the
> "commit"
>phase do we need to potentially wait for the reporter
>- is simple because for many sink tasks they only need to call
>`report(...)` and will not need to even worry about the future;
>- is flexible because any task that needs stricter guarantees can use
>the future to block on the reporter, including at some later point in
> time
>after the `report(...)` method is called; and
>- is not onerous because using the future is a common pattern and simple
>blocking, if needed, is trivial.
>
> It is true that when the last record on the last put before a commit is
> reported as an error, the framework may have to wait. But this is no worse
> and actually likely better than making the `report(...)` method
> synchronous.
>
> I hope this helps.
>
> Best regards,
>
> Randall
>
> On Mon, May 18, 2020 at 4:44 PM Aakash Shah  wrote:
>
> > Hi Chris,
> >
> > I agree with your point.
> >
> > Randall, Konstantine, do you guys mind weighing in on any benefit of
> adding
> > asynchronous functionality using a Future in the KIP right now? It seems
> to
> > me that it only provides user contro

Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-18 Thread Aakash Shah
Hi Chris,

I agree with your point.

Randall, Konstantine, do you guys mind weighing in on any benefit of adding
asynchronous functionality using a Future in the KIP right now? It seems to
me that it only provides user control on when the thread will be blocked,
and if we are going to process all the futures at once in a batch at the
end, why not support batch processing in a future KIP, since it is not too
difficult now that we are adding an interface. I'm not sure I see any gain
beyond some user control that could increase throughput - but at the same
time, as I mentioned before, I don't think throughput is a factor we need
to consider much with error reporting. We don't really need or necessarily
want a higher throughput on error reporting, as ideally, there should not
be a high volume of errant records.

Thanks,
Aakash

On Mon, May 18, 2020 at 1:22 PM Chris Egerton  wrote:

> Hi Aakash,
>
> Yep, that's pretty much it. I'd also like to emphasize that we should be
> identifying practical use cases for whatever API we provide. Giving
> developers a future that can be made synchronous with little effort seems
> flexible, but if that's all that developers are going to do with it anyway,
> why make it a future at all? We should have some idea of how people would
> use a future that doesn't just hinge on them blocking on it immediately,
> and isn't more easily-addressed by a different API (such as one with batch
> reporting).
>
> Cheers,
>
> Chris
>
> On Mon, May 18, 2020 at 1:17 PM Aakash Shah  wrote:
>
> > Hi all,
> >
> > Chris, I see your points about whether Futures provide much benefit at
> all
> > as they are not truly fully asynchronous.
> >
> > Correct me if I am wrong, but I think what you are trying to point out is
> > that if we have the option to add additional functionality later (in a
> > simpler way too since we are introducing a new interface), we should
> > provide functionality that we know will provide value immediately and not
> > cause any developer/user burden.
> >
> > In that case, I think the main area we have to come to a consensus on is
> -
> > how much control do we want to provide to the developer/user in this KIP
> > considering that we can add the functionality relatively easily later?
> >
> > Randall, Konstantine, what do you think about adding it later vs now?
> >
> > Thanks,
> > Aakash
> >
> > On Mon, May 18, 2020 at 12:45 PM Chris Egerton 
> > wrote:
> >
> > > Hi Aakash,
> > >
> > > I asked this earlier about whether futures were the right way to go, if
> > we
> > > wanted to enable asynchronous behavior at all:
> > >
> > > > I'm still unclear on how futures are going to provide any benefit to
> > > developers, though. Blocking on the return of such a future slightly
> > later
> > > on in the process of handling records is still blocking, and to be done
> > > truly asynchronously without blocking processing of non-errant records,
> > > would have to be done on a separate thread. It's technically possible
> for
> > > users to cache all of these futures and instead of invoking "get" on
> > them,
> > > simply check whether they're complete or not via "isDone", but this
> seems
> > > like an anti-pattern.
> > >
> > > > What is the benefit of wrapping this in a future?
> > >
> > > As far as I can tell, there hasn't been a practical example yet where
> the
> > > flexibility provided by a future would actually be beneficial in
> writing
> > a
> > > connector. It'd be great if we could find one. One possible use case
> > might
> > > be processing records received in "SinkTask::put" without having to
> block
> > > for each errant record report before sending non-errant records to the
> > > sink. However, this could also be addressed by allowing for batch
> > reporting
> > > of errant records instead of accepting a single record at a time; the
> > task
> > > would track errant records as it processes them in "put" and report
> them
> > > all en-masse after all non-errant records have been processed.
> > >
> > > With regards to the precedent of using futures for asynchronous APIs, I
> > > think we should make sure that whatever API we decide on is actually
> > useful
> > > for the cases it serves. There's plenty of precedent for callback-based
> > > asynchronous APIs in Kafka with both "Producer::send" and
> > > "Consumer::com

Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-18 Thread Aakash Shah
Hi all,

Chris, I see your points about whether Futures provide much benefit at all
as they are not truly fully asynchronous.

Correct me if I am wrong, but I think what you are trying to point out is
that if we have the option to add additional functionality later (in a
simpler way too since we are introducing a new interface), we should
provide functionality that we know will provide value immediately and not
cause any developer/user burden.

In that case, I think the main area we have to come to a consensus on is -
how much control do we want to provide to the developer/user in this KIP
considering that we can add the functionality relatively easily later?

Randall, Konstantine, what do you think about adding it later vs now?

Thanks,
Aakash

On Mon, May 18, 2020 at 12:45 PM Chris Egerton  wrote:

> Hi Aakash,
>
> I asked this earlier about whether futures were the right way to go, if we
> wanted to enable asynchronous behavior at all:
>
> > I'm still unclear on how futures are going to provide any benefit to
> developers, though. Blocking on the return of such a future slightly later
> on in the process of handling records is still blocking, and to be done
> truly asynchronously without blocking processing of non-errant records,
> would have to be done on a separate thread. It's technically possible for
> users to cache all of these futures and instead of invoking "get" on them,
> simply check whether they're complete or not via "isDone", but this seems
> like an anti-pattern.
>
> > What is the benefit of wrapping this in a future?
>
> As far as I can tell, there hasn't been a practical example yet where the
> flexibility provided by a future would actually be beneficial in writing a
> connector. It'd be great if we could find one. One possible use case might
> be processing records received in "SinkTask::put" without having to block
> for each errant record report before sending non-errant records to the
> sink. However, this could also be addressed by allowing for batch reporting
> of errant records instead of accepting a single record at a time; the task
> would track errant records as it processes them in "put" and report them
> all en-masse after all non-errant records have been processed.
>
> With regards to the precedent of using futures for asynchronous APIs, I
> think we should make sure that whatever API we decide on is actually useful
> for the cases it serves. There's plenty of precedent for callback-based
> asynchronous APIs in Kafka with both "Producer::send" and
> "Consumer::commitAsync"; the question here shouldn't be about what's done
> in different APIs, but what would work for this one in particular.
>
> Finally, it's also been brought up that if we're going to introduce a new
> error reporter interface, we can always modify that interface later on to
> go from asynchronous to synchronous behavior, or vice-versa, or even to add
> a callback- or future-based variant that didn't exist before. We have
> plenty of room to maneuver in the future here, so the pressure to get
> everything right the first time and provide maximum flexibility doesn't
> seem as pressing, and the goal of minimizing the kind of API that we have
> to support for future versions without making unnecessary additions is
> easier to achieve.
>
> Cheers,
>
> Chris
>
>
>
> On Mon, May 18, 2020 at 12:20 PM Aakash Shah  wrote:
>
> > Hi Arjun,
> >
> > Thanks for your feedback.
> >
> > I agree with moving to Future, those are good points.
> >
> > I believe an earlier point made for asynchronous functionality were that
> > modern APIs tend to be asynchronous as they result in more expressive and
> > better defined APIs.
> > Additionally, because a lot of Kafka Connect functionality is already
> > asynchronous, I am inclined to believe that customers will want an
> > asynchronous solution for this as well. And if is relatively simple to
> > block with future.get() to make it synchronous, would you not say that
> > having an opt-in synchronous functionality rather than synchronous only
> > functionality allows for customer control while maintaining that not too
> > much burden of implementation is placed on the customer?
> > WDYT?
> >
> > Thanks,
> > Aakash
> >
> > On Sun, May 17, 2020 at 2:51 PM Arjun Satish 
> > wrote:
> >
> > > Thanks for all the feedback, folks.
> > >
> > > re: having a callback as a parameter, I agree that at this point, it
> > might
> > > not add much value to the proposal.
> > >
> > > re: synchronous vs asynchronous, is the m

Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-18 Thread Aakash Shah
Hi Arjun,

Thanks for your feedback.

I agree with moving to Future, those are good points.

I believe an earlier point made for asynchronous functionality were that
modern APIs tend to be asynchronous as they result in more expressive and
better defined APIs.
Additionally, because a lot of Kafka Connect functionality is already
asynchronous, I am inclined to believe that customers will want an
asynchronous solution for this as well. And if is relatively simple to
block with future.get() to make it synchronous, would you not say that
having an opt-in synchronous functionality rather than synchronous only
functionality allows for customer control while maintaining that not too
much burden of implementation is placed on the customer?
WDYT?

Thanks,
Aakash

On Sun, May 17, 2020 at 2:51 PM Arjun Satish  wrote:

> Thanks for all the feedback, folks.
>
> re: having a callback as a parameter, I agree that at this point, it might
> not add much value to the proposal.
>
> re: synchronous vs asynchronous, is the motivation performance/higher
> throughput? Taking a step back, calling report(..) in the new interface
> does a couple of things:
>
> 1. at a fundamental level, it is a signal to the framework that a failure
> occurred when processing records, specifically due to the given record.
> 2. depending on whether errors.log and errors.deadletterqueue has been set,
> some messages are written to zero or more destinations.
> 3. depending on the value of errors.tolerance (none or all), the task is
> failed after reporters have completed.
>
> for kip-610, the asynchronous method has the advantage of working with the
> internal dead letter queue (which has been transparent to the developer so
> far). but, how does async method help if the DLQ is not enabled? in this
> case RecordMetadata is not very useful, AFAICT? also, if we add more error
> reporters in the future (say, for example, a new reporter in a future that
> writes to a RDBMS), would the async version return success on all or
> nothing, and what about partial successes?
>
> overall, if we really need async behavior, I'd prefer to just use
> Future. but if we can keep it simple, then let's go with a
> synchronous function with the parameters Randall proposed above (with
> return type as void, and if any of the reporters fail, the task is failed
> if error.tolerance is none, and kept alive if tolerance is all), and maybe
> add asynchronous methods in a future KIP?
>
> Best,
>


[VOTE] KIP-610: Error Reporting in Sink Connectors

2020-05-17 Thread Aakash Shah
Hello all,

I'd like to open a vote for KIP-610:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-610%3A+Error+Reporting+in+Sink+Connectors

Thanks,
Aakash


Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-17 Thread Aakash Shah
My apologies, had a typo. Meant to say "I will now open up a vote."

Thanks,
Aakash

On Sun, May 17, 2020 at 4:55 PM Aakash Shah  wrote:

> Hi all,
>
> Thanks for all the feedback thus far. I've updated the KIP with all the
> suggestions. I will not open up a vote.
>
> Thanks,
> Aakash
>
> On Sun, May 17, 2020 at 3:45 PM Randall Hauch  wrote:
>
>> All good points regarding `Future` instead of
>> `Future`, so +1 to that change.
>>
>> A few more nits. The following sentences should be removed because they
>> actually describe a change from the current DLQ functionality that already
>> sets `max.in.flight.requests.per.connection=1` by default:
>>
>> "In order to avoid error records being written out of order (for example,
>> due to retries), the developer should always use
>> max.in.flight.requests.per.connection=1 in their implementation for
>> writing
>> error records. If the developer determines that order is not important and
>> they want extreme performance, they can always increase this number."
>>
>> Another is a bit ambiguous, so I suggest changing:
>>
>> "The error reporting functionality is designed to be asynchronous but can
>> be made synchronous if desired. By default, error reporting will be
>> asynchronous; processing of the subsequent errant record will not be
>> blocked by the successful processing of the prior errant record. However,
>> if the developer prefers synchronous functionality, they can block
>> processing of the next record with future.get()."
>>
>> to:
>>
>> "The error reporting functionality is asynchronous. Tasks can use the
>> resulting future to wait for the record and exception to be written to
>> Kafka."
>>
>> Can we please move the example to a new "Example Usage" section that is
>> *after* the "Interface" section? That way the order of the sections is
>> "Method", "Interface", and "Example Usage", and it's more clear how the
>> API
>> is being changed. Also, the first sentence introducing the example is
>> currently:
>>
>> "The usage will look like the following:"
>>
>> but IMO it should actually say it's an example:
>>
>> "The following is an example of how a sink task can use this error
>> reporter
>> and support connectors being deployed in earlier versions of the Connect
>> runtime:"
>>
>> It seems we have pretty good consensus, so I think the KIP is ready for a
>> vote after the above minor corrections are made.
>>
>> Best regards,
>>
>> Randall
>>
>> On Sun, May 17, 2020 at 4:51 PM Arjun Satish 
>> wrote:
>>
>> > Thanks for all the feedback, folks.
>> >
>> > re: having a callback as a parameter, I agree that at this point, it
>> might
>> > not add much value to the proposal.
>> >
>> > re: synchronous vs asynchronous, is the motivation performance/higher
>> > throughput? Taking a step back, calling report(..) in the new interface
>> > does a couple of things:
>> >
>> > 1. at a fundamental level, it is a signal to the framework that a
>> failure
>> > occurred when processing records, specifically due to the given record.
>> > 2. depending on whether errors.log and errors.deadletterqueue has been
>> set,
>> > some messages are written to zero or more destinations.
>> > 3. depending on the value of errors.tolerance (none or all), the task is
>> > failed after reporters have completed.
>> >
>> > for kip-610, the asynchronous method has the advantage of working with
>> the
>> > internal dead letter queue (which has been transparent to the developer
>> so
>> > far). but, how does async method help if the DLQ is not enabled? in this
>> > case RecordMetadata is not very useful, AFAICT? also, if we add more
>> error
>> > reporters in the future (say, for example, a new reporter in a future
>> that
>> > writes to a RDBMS), would the async version return success on all or
>> > nothing, and what about partial successes?
>> >
>> > overall, if we really need async behavior, I'd prefer to just use
>> > Future. but if we can keep it simple, then let's go with a
>> > synchronous function with the parameters Randall proposed above (with
>> > return type as void, and if any of the reporters fail, the task is
>> failed
>> > if error.tolerance is none, and kept alive if tolerance is all), and
>> maybe
>> > add asynchronous methods in a future KIP?
>> >
>> > Best,
>> >
>>
>


Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-17 Thread Aakash Shah
Hi all,

Thanks for all the feedback thus far. I've updated the KIP with all the
suggestions. I will not open up a vote.

Thanks,
Aakash

On Sun, May 17, 2020 at 3:45 PM Randall Hauch  wrote:

> All good points regarding `Future` instead of
> `Future`, so +1 to that change.
>
> A few more nits. The following sentences should be removed because they
> actually describe a change from the current DLQ functionality that already
> sets `max.in.flight.requests.per.connection=1` by default:
>
> "In order to avoid error records being written out of order (for example,
> due to retries), the developer should always use
> max.in.flight.requests.per.connection=1 in their implementation for writing
> error records. If the developer determines that order is not important and
> they want extreme performance, they can always increase this number."
>
> Another is a bit ambiguous, so I suggest changing:
>
> "The error reporting functionality is designed to be asynchronous but can
> be made synchronous if desired. By default, error reporting will be
> asynchronous; processing of the subsequent errant record will not be
> blocked by the successful processing of the prior errant record. However,
> if the developer prefers synchronous functionality, they can block
> processing of the next record with future.get()."
>
> to:
>
> "The error reporting functionality is asynchronous. Tasks can use the
> resulting future to wait for the record and exception to be written to
> Kafka."
>
> Can we please move the example to a new "Example Usage" section that is
> *after* the "Interface" section? That way the order of the sections is
> "Method", "Interface", and "Example Usage", and it's more clear how the API
> is being changed. Also, the first sentence introducing the example is
> currently:
>
> "The usage will look like the following:"
>
> but IMO it should actually say it's an example:
>
> "The following is an example of how a sink task can use this error reporter
> and support connectors being deployed in earlier versions of the Connect
> runtime:"
>
> It seems we have pretty good consensus, so I think the KIP is ready for a
> vote after the above minor corrections are made.
>
> Best regards,
>
> Randall
>
> On Sun, May 17, 2020 at 4:51 PM Arjun Satish 
> wrote:
>
> > Thanks for all the feedback, folks.
> >
> > re: having a callback as a parameter, I agree that at this point, it
> might
> > not add much value to the proposal.
> >
> > re: synchronous vs asynchronous, is the motivation performance/higher
> > throughput? Taking a step back, calling report(..) in the new interface
> > does a couple of things:
> >
> > 1. at a fundamental level, it is a signal to the framework that a failure
> > occurred when processing records, specifically due to the given record.
> > 2. depending on whether errors.log and errors.deadletterqueue has been
> set,
> > some messages are written to zero or more destinations.
> > 3. depending on the value of errors.tolerance (none or all), the task is
> > failed after reporters have completed.
> >
> > for kip-610, the asynchronous method has the advantage of working with
> the
> > internal dead letter queue (which has been transparent to the developer
> so
> > far). but, how does async method help if the DLQ is not enabled? in this
> > case RecordMetadata is not very useful, AFAICT? also, if we add more
> error
> > reporters in the future (say, for example, a new reporter in a future
> that
> > writes to a RDBMS), would the async version return success on all or
> > nothing, and what about partial successes?
> >
> > overall, if we really need async behavior, I'd prefer to just use
> > Future. but if we can keep it simple, then let's go with a
> > synchronous function with the parameters Randall proposed above (with
> > return type as void, and if any of the reporters fail, the task is failed
> > if error.tolerance is none, and kept alive if tolerance is all), and
> maybe
> > add asynchronous methods in a future KIP?
> >
> > Best,
> >
>


Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-17 Thread Aakash Shah
Hi all,

I've updated the KIP to reflect all the new agreed-upon suggestions.

Please let me know if you have any more suggestions.

Thanks,
Aakash

On Sun, May 17, 2020 at 12:06 PM Konstantine Karantasis <
konstant...@confluent.io> wrote:

> Hi all,
>
> I'm on board with adding an interface in the Connect API as Arjun
> suggested. Slightly higher commitment and maintenance but it also gives us
> an easier path to future extensions in this scope (error handling). The
> usage is equivalent to adding just a new method with known types to
> `SinkTaskContext` (the `NoClassDefFoundError` can be added for completeness
> in the connector code, but in both suggestions this would fail with
> `NoSuchMethodError` on older workers).
>
> With respect to the method signature, I also agree with Randall's latest
> suggestion, of a two argument method such as:
>
> Future report(SinkTask, Throwable)
>
> Returning `Future` can also be ok, but since this refers to
> the DLQ I'd slightly prefer to avoid exposing information that might
> confuse the users regarding what topic, partitions and offset this return
> value corresponds to. But both return types should be fine and will give
> plenty of flexibility to connector developers, making the sync use case
> straightforward. In any case, given the interface we can extend this in a
> compatible way in the future if we think we need to.
>
> Minor comments:
> Version will be 2.6 and not 2.9 (the latter was added by accident in a few
> places).
>
> Best,
> Konstantine
>
>
> On Sun, May 17, 2020 at 11:25 AM Magesh kumar Nandakumar <
> mage...@confluent.io> wrote:
>
> > If that's the case, I think framework should not commit if there are any
> > outstanding records in teh reporter. That would prevent the scenario
> where
> > we could potentially lose records frm being sent either to Sink/the
> > reporter. WDYT about the KIP including that as part of the design?
> >
> > On Sun, May 17, 2020 at 11:13 AM Randall Hauch  wrote:
> >
> > > On Sun, May 17, 2020 at 12:42 PM Magesh kumar Nandakumar <
> > > mage...@confluent.io> wrote:
> > >
> > > > Randall
> > > >
> > > > Thanks a lot for your thoughts. I was wondering if we would ever have
> > to
> > > > make the API asynchronous, we could expose it as a new method right?
> If
> > > > that's a possibility would it be better if the API explicitly has
> > > semantics
> > > > of a synchronous API if the implementation is indeed going to be
> > > > synchronous.
> > > >
> > >
> > > Thanks, Magesh.
> > >
> > > I think it's likely that the implementation may need to be synchronous
> to
> > > some degree. For example, just to keep the implementation simple we
> might
> > > block the WorkerSinkTask after `put(Collection)` returns we
> > > might latch until the reporter has received all acks, especially if it
> > > simplifies the offset management and commit logic.
> > >
> > > Even if that's the case, having each `report(...)` call be asynchronous
> > > means that the sink task doesn't *have* to wait until each failed
> record
> > > has been recorded to continue sending valid records to the external
> > system.
> > > Consider an example with 1000 records in a batch, where only the first
> > > record has an error. If `record(...)` were synchronous, the `put(...)`
> > > method would block reporting the first record and would then only send
> > the
> > > 999 after that's happened. With an asynchronous `record(...)` method,
> the
> > > `put(...)` method could report the first record, send the 999 records,
> > and
> > > then wait for the futures returned by the report method.
> > >
> > >
> > > >
> > > > On Sun, May 17, 2020, 9:27 AM Randall Hauch 
> wrote:
> > > >
> > > > > On Sun, May 17, 2020 at 11:01 AM Magesh kumar Nandakumar <
> > > > > mage...@confluent.io> wrote:
> > > > >
> > > > > > Thanks Randall. The suggestion i made also has a problem when
> > > reporter
> > > > > > isn't enabled where it could potentially write records after
> error
> > > > > records
> > > > > > to sink before failing.
> > > > > >
> > > > > > The other concern i had with reporter being asynchronous. For
> some
> > > > reason
> > > > > > if the reporter is taking longer because of say a specific broker
> > > > issue,
> > > > > > the connector might still move forward and commit if it's not
> > waiting
> > > > for
> > > > > > the reporter.  During  this if the worker crashes we will now
> lose
> > > the
> > > > > bad
> > > > > > record
> > > > > >  I don't think this is desirable behavior. I think the
> synchronous
> > > > > reporter
> > > > > > provides better guarantees for all connectors.
> > > > > >
> > > > > >
> > > > > Thanks, Magesh.
> > > > >
> > > > > That's a valid concern, and maybe that will affect how the feature
> is
> > > > > actually implemented. I expect it to be a bit tricky to ensure that
> > > > errant
> > > > > records are fully written to Kafka before the offsets are
> committed,
> > so
> > > > it
> > > > > might be simplest to start out with a synchronous 

Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-17 Thread Aakash Shah
nking phase, resolution of symbolic references is
> > optional.
> > > But the important bit is this
> > > <
> https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-5.html#jvms-5.4
> > >:
> > > Whichever strategy (lazy or eager) is followed, any error detected
> during
> > > resolution must be thrown at a point in the program that (directly or
> > > indirectly) uses a symbolic reference to the class or interface.
> > >
> > > So this means that any new classes that were originally relied upon the
> > > connector that are missing now, will be in some internal "not found"
> say
> > > but no errors will be thrown till resolution stage.
> > >
> > > 3. Section 5.4.3
> > > <
> >
> https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-5.html#jvms-5.4.3
> > > >
> > > says execution of any of the (anewarray, checkcast etc) instructions
> > > requires resolution of its symbolic reference, which is good since the
> > > instructions are executed in order, and the connector developer has
> time
> > to
> > > execute some code before program has to look for a unknown object.
> > >
> > > We are also safe in that the new interface will be used only during or
> > > after start() is called, so there should not be any fields that do
> > > something like:
> > >
> > > private final ErrantRecordReporter reporter =
> > > sinkTaskContext.failedRecordReporter();
> > >
> > > The initialization will only be valid in start(), where we can safely
> > wrap
> > > around try catch issues. The errors being thrown are very precise.
> > > NoSuchMethodError
> > > <
> > >
> >
> https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-5.html#jvms-5.4.3.3
> > > >
> > > for method not found, and NoClassDefFoundError for any classes loaded
> > > during resolution
> > > <
> https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-5.html#jvms-5.3
> > >.
> > >
> > > BTW, one more example that comes to mind is the JDBC spec (the java.sql
> > > package has been changing with every major version of Java). Newer
> > classes
> > > are added (look at java.sql.DriverAction (in Java 8) and
> > > java.sql.ShardingKey (in java 9)). New methods to existing classes are
> > > being added as well (for example,
> > > java.sql.DriverManager#registerDriver(java.sql.Driver,
> > > java.sql.DriverAction)
> > > <
> > >
> >
> https://docs.oracle.com/javase/8/docs/api/java/sql/DriverManager.html#registerDriver-java.sql.Driver-java.sql.DriverAction-
> > > >).
> > > and the expectation is that the driver should handle any runtime
> > > inconsistencies.
> > >
> > > Overall, I think it should be safe to have new interfaces, and have
> them
> > > loaded safely, with correct checks.
> > >
> > > BTW, one more opportunity we have is that the connector can check if
> > these
> > > new methods are present or not. For example:
> > >
> > >
> > >
> >
> Class.forName("org.apache.kafka.connect.sink.SinkTaskContext").getMethod("failedRecordReporter");
> > >
> > > And based on this, chose to return old a different task in
> > > SinkConnector#taskClass() if they want to hyper-safe.
> > >
> > > Apologies for throwing in such a wrench at the last minute! But IMHO
> it'd
> > > be good to take this opportunity if we can.
> > >
> > > Best,
> > >
> > > On Sat, May 16, 2020 at 6:05 PM Randall Hauch 
> wrote:
> > >
> > > > Thanks for updating the KIP, Aakash. A few comments on the updated
> > > content
> > > > there:
> > > >
> > > > In order to avoid error records being written out of order (for
> > example,
> > > > > due to retries), the developer can use
> > > > > `max.in.flight.requests.per.connection=1` in their implementation
> for
> > > > > writing error records.
> > > > >
> > > >
> > > > IMO, the DLQ should always use this setting to prevent retries, and
> the
> > > > developer can always set this property for the DLQ to something
> larger
> > > than
> > > > 1 if order is not important and they need the extreme performance.
> > > >
> > > > Along with the original errant sink record, the exception thrown will
> > be
> > > &

Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-16 Thread Aakash Shah
e acceptable?
> > >
> > > Best regards,
> > >
> > > Randall
> > >
> > > On Sat, May 16, 2020 at 12:24 PM Konstantine Karantasis <
> > > konstant...@confluent.io> wrote:
> > >
> > > > Thanks for the quick response Aakash.
> > > >
> > > > With respect to deprecation, this refers to deprecating this method
> in
> > > > newer versions of Kafka Connect (and eventually removing it).
> > > >
> > > > As a connector developer, if you want your connector to run across a
> > wide
> > > > spectrum of Connect versions, you'll have to take this into
> > consideration
> > > > and retain both methods in a functional state. The good news is that
> > both
> > > > methods can share a lot of code, so in reality both the old and the
> new
> > > put
> > > > will be thin shims over a `putRecord` method (or `process` method as
> > you
> > > > call it in the KIP).
> > > >
> > > > Given the above, there's no requirement to conditionally call one
> > method
> > > or
> > > > the other in the framework based on configuration. Once you implement
> > the
> > > > new `put` with something other than its default implementation, as a
> > > > connector developer, you'll know to adapt to the above.
> > > >
> > > > I definitely suggest extending our docs in a meaningful way in order
> to
> > > > make the upgrade path easy to follow. Maybe you'd like to add a note
> to
> > > > your compatibility section in this KIP as well.
> > > >
> > > > Regards,
> > > > Konstantine
> > > >
> > > > On Sat, May 16, 2020 at 10:13 AM Aakash Shah 
> > wrote:
> > > >
> > > > > +1
> > > > >
> > > > > On Sat, May 16, 2020 at 9:55 AM Konstantine Karantasis <
> > > > > konstant...@confluent.io> wrote:
> > > > >
> > > > > > Hi Arjun,
> > > > > >
> > > > > > I think I agree with you that subject is interesting. Yet, I feel
> > it
> > > > > > belongs to a separate future KIP. Reading the proposal in the KIP
> > > > format
> > > > > > will help, at least myself, to understand it better.
> > > > > >
> > > > > > Having said that, for the purpose of simplifying error handling
> for
> > > > sink
> > > > > > tasks, the discussion on KIP-610 has made some good progress on
> the
> > > > > mailing
> > > > > > list. If the few open items are reflected on the proposal, maybe
> > it'd
> > > > be
> > > > > > even worthwhile to consider it for inclusion in the upcoming
> > release
> > > > with
> > > > > > its current scope.
> > > > > >
> > > > > > Konstantine
> > > > > >
> > > > > >
> > > > > > On Fri, May 15, 2020 at 7:44 PM Arjun Satish <
> > arjun.sat...@gmail.com
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > > I'm kinda hoping that we get to an approach on how to extend
> the
> > > > > Connect
> > > > > > > framework. Adding parameters in the put method is nice, and
> maybe
> > > > works
> > > > > > for
> > > > > > > now, but I'm not sure how scalable it is. It'd great to be able
> > to
> > > > add
> > > > > > more
> > > > > > > functionality in the future. Couple of examples:
> > > > > > >
> > > > > > > - make the metrics registry available to a task, so they can
> > report
> > > > > task
> > > > > > > level metrics or
> > > > > > > - be able to pass in a RestExtension handle to the task, so the
> > > task
> > > > > can
> > > > > > > provide a rest endpoint which users can hit to get some task
> > level
> > > > > > > information (about its status, health, for example)
> > > > > > >
> > > > > > > In such scenarios, maybe adding new parameters to existing
> > methods
> > > > may
> > > > > > not
> > > > > > > be immediately acceptable.
> > >

Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-16 Thread Aakash Shah
how easy it is for a sink to maintain backward compatibility.
> (The use of `BiFunction` helps tremendously.) Another not insignificant
> advantage is that a sink task can use this reporter reference throughout
> the task's lifetime (after it's started and before it is stopped), making
> it less invasive for existing sink task implementations that want to use
> it.
>
> I hope we can all get being this compromise, which IMO is actually super
> clean and makes a lot of sense. Thanks, Andrew, for originally suggesting
> it. I know we're all trying to improve the Connect API in a way that makes
> sense, and deliberate and constructive discussion is a healthy thing.
> Thanks again to everyone for participating!
>
> BTW, we've agreed upon a number of other changes, but I don't see any of
> those changes on the KIP. Aakash, can you please update the KIP quickly so
> we can make sure the other parts are the KIP are acceptable?
>
> Best regards,
>
> Randall
>
> On Sat, May 16, 2020 at 12:24 PM Konstantine Karantasis <
> konstant...@confluent.io> wrote:
>
> > Thanks for the quick response Aakash.
> >
> > With respect to deprecation, this refers to deprecating this method in
> > newer versions of Kafka Connect (and eventually removing it).
> >
> > As a connector developer, if you want your connector to run across a wide
> > spectrum of Connect versions, you'll have to take this into consideration
> > and retain both methods in a functional state. The good news is that both
> > methods can share a lot of code, so in reality both the old and the new
> put
> > will be thin shims over a `putRecord` method (or `process` method as you
> > call it in the KIP).
> >
> > Given the above, there's no requirement to conditionally call one method
> or
> > the other in the framework based on configuration. Once you implement the
> > new `put` with something other than its default implementation, as a
> > connector developer, you'll know to adapt to the above.
> >
> > I definitely suggest extending our docs in a meaningful way in order to
> > make the upgrade path easy to follow. Maybe you'd like to add a note to
> > your compatibility section in this KIP as well.
> >
> > Regards,
> > Konstantine
> >
> > On Sat, May 16, 2020 at 10:13 AM Aakash Shah  wrote:
> >
> > > +1
> > >
> > > On Sat, May 16, 2020 at 9:55 AM Konstantine Karantasis <
> > > konstant...@confluent.io> wrote:
> > >
> > > > Hi Arjun,
> > > >
> > > > I think I agree with you that subject is interesting. Yet, I feel it
> > > > belongs to a separate future KIP. Reading the proposal in the KIP
> > format
> > > > will help, at least myself, to understand it better.
> > > >
> > > > Having said that, for the purpose of simplifying error handling for
> > sink
> > > > tasks, the discussion on KIP-610 has made some good progress on the
> > > mailing
> > > > list. If the few open items are reflected on the proposal, maybe it'd
> > be
> > > > even worthwhile to consider it for inclusion in the upcoming release
> > with
> > > > its current scope.
> > > >
> > > > Konstantine
> > > >
> > > >
> > > > On Fri, May 15, 2020 at 7:44 PM Arjun Satish  >
> > > > wrote:
> > > >
> > > > > I'm kinda hoping that we get to an approach on how to extend the
> > > Connect
> > > > > framework. Adding parameters in the put method is nice, and maybe
> > works
> > > > for
> > > > > now, but I'm not sure how scalable it is. It'd great to be able to
> > add
> > > > more
> > > > > functionality in the future. Couple of examples:
> > > > >
> > > > > - make the metrics registry available to a task, so they can report
> > > task
> > > > > level metrics or
> > > > > - be able to pass in a RestExtension handle to the task, so the
> task
> > > can
> > > > > provide a rest endpoint which users can hit to get some task level
> > > > > information (about its status, health, for example)
> > > > >
> > > > > In such scenarios, maybe adding new parameters to existing methods
> > may
> > > > not
> > > > > be immediately acceptable.
> > > > >
> > > > > Since we are very close to a deadline, I wanted to check if the one

Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-16 Thread Aakash Shah
+1

On Sat, May 16, 2020 at 9:55 AM Konstantine Karantasis <
konstant...@confluent.io> wrote:

> Hi Arjun,
>
> I think I agree with you that subject is interesting. Yet, I feel it
> belongs to a separate future KIP. Reading the proposal in the KIP format
> will help, at least myself, to understand it better.
>
> Having said that, for the purpose of simplifying error handling for sink
> tasks, the discussion on KIP-610 has made some good progress on the mailing
> list. If the few open items are reflected on the proposal, maybe it'd be
> even worthwhile to consider it for inclusion in the upcoming release with
> its current scope.
>
> Konstantine
>
>
> On Fri, May 15, 2020 at 7:44 PM Arjun Satish 
> wrote:
>
> > I'm kinda hoping that we get to an approach on how to extend the Connect
> > framework. Adding parameters in the put method is nice, and maybe works
> for
> > now, but I'm not sure how scalable it is. It'd great to be able to add
> more
> > functionality in the future. Couple of examples:
> >
> > - make the metrics registry available to a task, so they can report task
> > level metrics or
> > - be able to pass in a RestExtension handle to the task, so the task can
> > provide a rest endpoint which users can hit to get some task level
> > information (about its status, health, for example)
> >
> > In such scenarios, maybe adding new parameters to existing methods may
> not
> > be immediately acceptable.
> >
> > Since we are very close to a deadline, I wanted to check if the one more
> > possibility is acceptable :-)
> >
> > What if we could create a library that could be used by connector to
> > independently integrated by connector developers in their connectors. The
> > library would be packaged and shipped with their connector like any other
> > library on maven (and other similar repositories). The new module would
> be
> > in the AK project, but its jars will *not* be added to classpath for
> > Connect worker.
> >
> > The library would provide a public interface for an error reporter, which
> > provides both synchronous and asynchronous functionalities (as was
> brought
> > up above).
> >
> > This would be an independent library, they can be easily bundled and
> loaded
> > with the other connectors. The connect framework will be decoupled from
> > this utility.
> >
> > I understand that a similar option is in the rejected alternatives,
> mostly
> > because of configuration overhead, but the configuration required here
> can
> > come directly from the worker properties (and just be copy pasted from
> > there, maybe with a prefix). and I wonder (if maybe part as a future
> KIP),
> > we can evaluate a strategy where certain worker configs can be passed to
> a
> > connector (for example, the producer/consume/admin ones), so end users do
> > not have to.
> >
> > Overall, we would get clean APIs, contracts and developers get freedom to
> > use these libraries and functionalities however they want. The only
> > drawback is how this is configured (end-users will have to add more lines
> > in the json/properties files). But all configs can simply come from
> worker,
> > I believe this is relatively minor issue. We should be able to work out
> > compatibility issues in the implementations, so that the library can
> safely
> > run (and degrade functionality if needed) with old workers.
> >
> >
> > On Fri, May 15, 2020 at 7:04 PM Aakash Shah  wrote:
> >
> > > Just wanted to clarify that I am on board with adding the overloaded
> > > put(...) method.
> > >
> > > Thanks,
> > > Aakash
> > >
> > > On Fri, May 15, 2020 at 7:00 PM Aakash Shah 
> wrote:
> > >
> > > > Hi Randall and Konstantine,
> > > >
> > > > As Chris and Arjun mentioned, I think the main concern is the
> potential
> > > > gap in which developers don't implement the deprecated method due to
> a
> > > > misunderstanding of use cases. Using the setter method approach
> ensures
> > > > that the developer won't break backwards compatibility when using the
> > new
> > > > method due to a mistake. That being said, I think the value added in
> > > > clarity of contract of when the error reporter will be invoked and
> > > overall
> > > > aesthetic while maintaining backwards compatibility outweighs the
> > > potential
> > > > mistake of a developer in not implementing the original put(...)
> > met

Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-16 Thread Aakash Shah
Hi Konstantine,

Thanks a lot for your feedback.

These are all good points, especially that we already have the threads we
need and that we'd rather not spin up additional. It is also true we should
consider the level of control we want to provide to the developer rather
than overstating the burden. In that case, I like the idea of using Futures
to provide asynchronous functionality.

One last proposal/thought I have, please let me know if it is
feasible/viable: because of the concerns some of us have for deprecating
the current put(...), what if we were to add the overloaded
put(Collection, BiFunction) as mentioned before that gets
invoked *only* if the error reporter config is toggled. In other words,
connector developers would implement both, and based on the error reporter
configuration, one of the put(...) functions would be invoked. This would
remove the confusion as to which method should be implemented since both
are considered functions in use, but for different use cases. If the
connector is deployed on an older version of AK, the original put(...)
would be implemented anyway and there would be no compatibility issues.The
main drawback would be some duplication of code.

Please let me know what you think.

Thanks,
Aakash

On Sat, May 16, 2020 at 9:42 AM Konstantine Karantasis <
konstant...@confluent.io> wrote:

> > I believe it is important to relieve as much of the burden of
> > implementation as possible from the developer in this case, and thus I
> > think using a Callback rather than a Future would be easier on the
> > developer, while adding asynchronous functionality with the ability to
> > opt-in synchronous functionality. I also believe making it opt-in
> > synchronous vs. the other way simplifies implementation for the developer
> > (blocking vs creating a new thread).
>
>
> What's probably important to highlight here is that, sync or async, the
> control needs to return to the connector developer at some point, because
> she'll decide what to do with the fact that the record for the failed sink
> record was reported (e.g. was produced to the DLQ). Before we overstate the
> burden to the connector developer as opposed to the level of control we
> want to offer to her, let's see how that burden would look like in a couple
> of examples:
>
> First, the exact equivalent of your initial proposal (this corresponds
> precisely to what a blocking BiConsumer would do in your example).
>
> public void put(Collection sinkRecords, BiFunction Throwable, Future> failedRecordsReporter) {
>   for (SinkRecord record : sinkRecords) {
> try {
>   process(record);
> } catch (Throwable t) {
>   try {
> failedRecordsReporter.apply(record, t).get();
>   } catch (InterruptedException | ExecutionException e) {
> throw new ConnectException(e);
>   }
> }
>   }
> }
>
> Essentially same lines as your initial example plus the exception handling
> which I added for the sake of completeness and of course it'd be fleshed
> out to separate methods in production code). Basically the burden is:
> failedRecordsReporter.apply(record, t).get(); vs
> failedRecordsReporter.apply(record, t);
>
>
> Additionally, the async equivalent does not differ much:
>
> public void put(Collection sinkRecords, BiFunction Throwable, Future> failedRecordsReporter) {
>   Map> futures = new HashMap<>();
>   for (SinkRecord record : sinkRecords) {
> try {
>   process(record);
> } catch (Throwable t) {
>   futures.put(record, failedRecordsReporter.apply(record, t));
> }
>   }
>
>   futures.forEach((record, future) -> {
> try {
>   future.get();
> } catch (InterruptedException | ExecutionException e) {
>   throw new ConnectException(e);
> }
>   });
> }
>
> And of course the Map doesn't even have to be processed in `put`. It can
> also be processed in `preCommit` or another place in the connector code.
> But let's not divert from the main use case too much.
>
> I don't easily see how an API definition based on callbacks would simplify
> things here. Keep in mind that we already have the threads we need and we'd
> rather not spin additional. That's the Worker thread that runs the sink
> task and the Kafka producer thread. Also, as I said, we need to return
> control to the developer, so the above leads to a more intuitive
> implementation.
>
> I hope this helps.
>
> Konstantine
>
>
> On Fri, May 15, 2020 at 7:01 PM Aakash Shah  wrote:
>
> > Hi Randall and Konstantine,
> >
> > As Chris and Arjun mentioned, I think the main concern is the potential
> gap
> > in which developers don't implement the deprecate

Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-15 Thread Aakash Shah
Just wanted to clarify that I am on board with adding the overloaded
put(...) method.

Thanks,
Aakash

On Fri, May 15, 2020 at 7:00 PM Aakash Shah  wrote:

> Hi Randall and Konstantine,
>
> As Chris and Arjun mentioned, I think the main concern is the potential
> gap in which developers don't implement the deprecated method due to a
> misunderstanding of use cases. Using the setter method approach ensures
> that the developer won't break backwards compatibility when using the new
> method due to a mistake. That being said, I think the value added in
> clarity of contract of when the error reporter will be invoked and overall
> aesthetic while maintaining backwards compatibility outweighs the potential
> mistake of a developer in not implementing the original put(...) method.
>
> With respect to synchrony, I agree with Konstantine's point, that we
> should make it an opt-in feature of making the reporter only synchronous.
> At the same time, I believe it is important to relieve as much of the
> burden of implementation as possible from the developer in this case, and
> thus I think using a Callback rather than a Future would be easier on the
> developer, while adding asynchronous functionality with the ability to
> opt-in synchronous functionality. I also believe making it opt-in
> synchronous vs. the other way simplifies implementation for the developer
> (blocking vs creating a new thread).
>
> Please let me know your thoughts. I would like to come to a consensus soon
> due to the AK 2.6 deadlines; I will then shortly update the KIP and start a
> vote.
>
> Thanks,
> Aakash
>
> On Fri, May 15, 2020 at 2:24 PM Randall Hauch  wrote:
>
>> On Fri, May 15, 2020 at 3:13 PM Arjun Satish 
>> wrote:
>>
>> > Couple of thoughts:
>> >
>> > 1. If we add new parameters to put(..), and new connectors implement
>> only
>> > this method, it makes them backward incompatible with older workers. I
>> > think newer connectors may only choose to only implement the latest
>> method,
>> > and we are passing the compatibility problems back to the connector
>> > developers.
>> >
>>
>> New connectors would have to implement both if they want to run in older
>> runtimes.
>>
>>
>> > 2. if we deprecate the older put() method and eventually remove it, then
>> > old connectors are forward incompatible. If we are not going to remove
>> it,
>> > then maybe we should not deprecate it?
>> >
>>
>> I don't think we'll ever remove deprecated methods -- there's no reason to
>> cut off older connectors.
>>
>>
>> > 3. if a record is realized to be erroneous outside put() (say, in flush
>> or
>> > preCommit), how will it be reported?
>> >
>>
>> This is a concern no matter how the reporter is passed to the task.
>> Actually, I think it's more clear that the reporter passed through
>> `put(...)` should be used to record errors on the SinkRecords passed in
>> the
>> same method call.
>>
>>
>> >
>> > I do think the concern over aesthetics is an important one, but the
>> > trade-off here is to exclude many connectors that are out there from
>> > running on worker versions. there may be production deployments that
>> need
>> > one old and one new connector that now cannot work on any version of a
>> > single worker. Building connectors is complex, and it's kinda unfair to
>> > expect folks to make changes over aesthetic reasons alone. This is
>> probably
>> > the reason why popular framework APIs very rarely (and probably never)
>> > change.
>> >
>>
>> I don't see how passing the reporter through an overloaded `put(...)` is
>> less backward compatible. Because the runtime provides the SinkTask base
>> class, the runtime has control over what the methods do by default.
>>
>>
>> >
>> > Overall, yes, the "public void
>> errantRecordReporter(BiConsumer> > Throwable> reporter) {}" proposal in the original KIP is somewhat of a
>> > mouthful, but are there are any simpler alternatives that do not exclude
>> > existing connectors, adding operational burdens and yet provide a clean
>> > contract?
>> >
>>
>> IMO, overloading `put(...)` is cleaner and easier to understand -- plus
>> the
>> other benefits in my earlier email.
>>
>>
>> >
>> > Best,
>> >
>> > PS: Apologies if the language is incorrect or some points are unclear.
>> >
>> > On Fri,

Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-15 Thread Aakash Shah
you describe is easily an opt-in feature for the connector
> > > developer.
> > > > At the same time, the latest description above, gives us better
> chances
> > > for
> > > > this API to remain like this for longer, because it covers both the
> > sync
> > > > and async per `put` user cases.
> > >
> > >
> > > +1
> > >
> > >
> > > > Given how simple the sync implementation
> > > > is, just by complying with the return type of the method, I still
> think
> > > the
> > > > BiFunction definition that returns a Future makes sense.
> > > >
> > > > Konstantine
> > > >
> > > >
> > > >
> > > > On Fri, May 15, 2020 at 11:27 AM Aakash Shah 
> > wrote:
> > > >
> > > > > Thanks for the additional feedback.
> > > > >
> > > > > I see the benefits of adding an overloaded put(...) over
> alternatives
> > > > and I
> > > > > am on board going forward with this approach. It will definitely
> set
> > > > forth
> > > > > a contract of where the reporter will be used with better
> aesthetics.
> > > > >
> > > > > The original idea of going with a synchronous approach for the
> error
> > > > > reporter was to ease the connector developer's job interacting with
> > and
> > > > > handling the error reporter. The tradeoff for having a
> > synchronous-only
> > > > > reporter would be lower throughput on the reporter; this was
> thought
> > to
> > > > be
> > > > > fine since arguably most circumstances would not include
> consistently
> > > > large
> > > > > amounts of records being sent to the error reporter. Even if this
> was
> > > the
> > > > > case, an argument can be made that the lower throughput would be of
> > > > > assistance in this case, as it would allow more time for the user
> to
> > > > > realize the connector is having records sent to the error reporter
> > > before
> > > > > many are sent. However, if we are strongly in favor of having the
> > > option
> > > > of
> > > > > asynchronous functionality available for the developer, then I am
> > fine
> > > > with
> > > > > that as well.
> > > > >
> > > > > Lastly, I am on board with changing the name to
> failedRecordReporter,
> > > > >
> > > > > Please let me know your thoughts.
> > > > >
> > > > > Thanks,
> > > > > Aakash
> > > > >
> > > > > On Fri, May 15, 2020 at 9:10 AM Randall Hauch 
> > > wrote:
> > > > >
> > > > > > Konstantine said:
> > > > > >
> > > > > > > I notice Randall also used BiFunction in his example, I wonder
> if
> > > > it's
> > > > > > for
> > > > > > > similar reasons.
> > > > > > >
> > > > > >
> > > > > > Nope. Just a typo on my part.
> > > > > >
> > > > > > There appear to be three outstanding questions.
> > > > > >
> > > > > > First, Konstantine suggested calling this
> "failedRecordReporter". I
> > > > think
> > > > > > this is minor, but using this new name may be a bit more precise
> > and
> > > > I'd
> > > > > be
> > > > > > fine with this.
> > > > > >
> > > > > > Second, should the reporter method be synchronous? I think the
> two
> > > > > options
> > > > > > are:
> > > > > >
> > > > > > 2a. Use `BiConsumer` that returns nothing
> > and
> > > > > blocks
> > > > > > (at this time).
> > > > > > 2b. Use `BiFunction>` that
> > > returns
> > > > a
> > > > > > future that the user can optionally use to be synchronous.
> > > > > >
> > > > > > I do agree with Konstantine that option 2b gives us more room for
> > > > future
> > > > > > semantic changes, and since the producer write is already
> > > asynchronous
> > > > > this
> > > > > > should be straightforward to implement. I think the concern her

Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-15 Thread Aakash Shah
Hi Konstantine,

Thanks for the insight. In that case, I will make the appropriate changes
to the KIP to support asynchronous functionality with a BiFunction.

Best,
Aakash

On Fri, May 15, 2020 at 11:45 AM Konstantine Karantasis <
konstant...@confluent.io> wrote:

> Thanks for the quick response Aakash.
>
> To your last point, modern APIs like this tend to be asynchronous (see
> admin, producer in Kafka) and such definition results in more expressive
> and well defined APIs.
>
> What you describe is easily an opt-in feature for the connector developer.
> At the same time, the latest description above, gives us better chances for
> this API to remain like this for longer, because it covers both the sync
> and async per `put` user cases. Given how simple the sync implementation
> is, just by complying with the return type of the method, I still think the
> BiFunction definition that returns a Future makes sense.
>
> Konstantine
>
>
>
> On Fri, May 15, 2020 at 11:27 AM Aakash Shah  wrote:
>
> > Thanks for the additional feedback.
> >
> > I see the benefits of adding an overloaded put(...) over alternatives
> and I
> > am on board going forward with this approach. It will definitely set
> forth
> > a contract of where the reporter will be used with better aesthetics.
> >
> > The original idea of going with a synchronous approach for the error
> > reporter was to ease the connector developer's job interacting with and
> > handling the error reporter. The tradeoff for having a synchronous-only
> > reporter would be lower throughput on the reporter; this was thought to
> be
> > fine since arguably most circumstances would not include consistently
> large
> > amounts of records being sent to the error reporter. Even if this was the
> > case, an argument can be made that the lower throughput would be of
> > assistance in this case, as it would allow more time for the user to
> > realize the connector is having records sent to the error reporter before
> > many are sent. However, if we are strongly in favor of having the option
> of
> > asynchronous functionality available for the developer, then I am fine
> with
> > that as well.
> >
> > Lastly, I am on board with changing the name to failedRecordReporter,
> >
> > Please let me know your thoughts.
> >
> > Thanks,
> > Aakash
> >
> > On Fri, May 15, 2020 at 9:10 AM Randall Hauch  wrote:
> >
> > > Konstantine said:
> > >
> > > > I notice Randall also used BiFunction in his example, I wonder if
> it's
> > > for
> > > > similar reasons.
> > > >
> > >
> > > Nope. Just a typo on my part.
> > >
> > > There appear to be three outstanding questions.
> > >
> > > First, Konstantine suggested calling this "failedRecordReporter". I
> think
> > > this is minor, but using this new name may be a bit more precise and
> I'd
> > be
> > > fine with this.
> > >
> > > Second, should the reporter method be synchronous? I think the two
> > options
> > > are:
> > >
> > > 2a. Use `BiConsumer` that returns nothing and
> > blocks
> > > (at this time).
> > > 2b. Use `BiFunction>` that returns
> a
> > > future that the user can optionally use to be synchronous.
> > >
> > > I do agree with Konstantine that option 2b gives us more room for
> future
> > > semantic changes, and since the producer write is already asynchronous
> > this
> > > should be straightforward to implement. I think the concern here is
> that
> > if
> > > the sink task does not *use* the future to make this synchronous, it is
> > > very possible that the error records could be written out of order (due
> > to
> > > retries). But this won't be an issue if the implementation uses
> > > `max.in.flight.requests.per.connection=1` for writing the error
> records.
> > > It's a little less clear, but honestly IMO passing the reporter in the
> > > `put(...)` method helps make this lambda easier to understand, for some
> > > strange reason. So unless there are good reasons to avoid this, I'd be
> in
> > > favor of 2b and returning a Future.
> > >
> > > Third, how do we pass the reporter lambda / method reference to the
> task?
> > > My proposal to pass the reporter via an overload `put(...)` still is
> the
> > > most attractive to me, for several reasons:
> > >
> > > 3a. There's no need to pass the reporter separatel

Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-15 Thread Aakash Shah
t Connect will pass the same reporter instance to each call
> to `put(...)` on a single task instance.
>
> Best regards,
>
> Randall
>
> On Fri, May 15, 2020 at 6:59 AM Andrew Schofield <
> andrew_schofi...@live.com>
> wrote:
>
> > Hi,
> > Randall's suggestion is really good. I think it gives the flexibility
> > required and also
> > keeps the interface the right way round.
> >
> > Thanks,
> > Andrew Schofield
> >
> > On 15/05/2020, 02:07, "Aakash Shah"  wrote:
> >
> > > Hi Randall,
> > >
> > > Thanks for the feedback.
> > >
> > > 1. This is a great suggestion, but I find that adding an overloaded
> > > put(...) which essentially deprecates the old put(...) to only be used
> > when
> > > a connector is deployed on older versions of Connect adds enough of a
> > > complication that could cause connectors to break if the old put(...)
> > > doesn't correctly invoke the overloaded put(...); either that, or it
> will
> > > add duplication of functionality across the two put(...) methods. I
> think
> > > the older method simplifies things with the idea that a DLQ/error
> > reporter
> > > will or will not be passed into the method depending on the version of
> > AK.
> > > However, I also understand the aesthetic advantage of this method vs
> the
> > > setter method, so I am okay with going in this direction if others
> agree
> > > with adding the overloaded put(...).
> > >
> > > 2. Yes, your assumption is correct. Yes, we can remove the "Order of
> > > Operations" if we go with the overloaded put(...) direction.
> > >
> > > 3. Great point, I will remove them from the KIP.
> > >
> > > 4. Yeah, accept(...) will be synchronous. I will change it to be
> clearer,
> > > thanks.
> > >
> > > 5. This KIP will use existing metrics as well introduce new metrics. I
> > will
> > > update this section to fully specify the metrics.
> > >
> > > Please let me know what you think.
> > >
> > > Thanks,
> > > Aakash
> > >
> > > On Thu, May 14, 2020 at 3:52 PM Randall Hauch 
> wrote:
> > >
> > > > Hi, Aakash.
> > > >
> > > > Thanks for the KIP. Connect does need an improved ability for sink
> > > > connectors to report individual records as being problematic, and
> this
> > > > integrates nicely with the existing DLQ feature.
> > > >
> > > > I also appreciate the desire to maintain compatibility so that
> > connectors
> > > > can take advantage of this feature when deployed in a runtime that
> > supports
> > > > this feature, but can safely and easily do without the feature when
> > > > deployed to an older runtime. But I do understand Andrew's concern
> > about
> > > > the aesthetics. Have you considered overloading the `put(...)` method
> > and
> > > > adding the `reporter` as a second parameter? Essentially it would add
> > the
> > > > one method (with proper JavaDoc) to `SinkTask` only:
> > > >
> > > > ```
> > > > public void put(Collection records,
> > BiFunction > > > Throwable> reporter) {
> > > > put(records);
> > > > }
> > > > ```
> > > > and the WorkerSinkTask would be changed to call `put(Collection,
> > > > BiFunction)` instead.
> > > >
> > > > Sink connector implementations that don't do anything different can
> > still
> > > > override `put(Collection)`, and it still works as before. Developers
> > that
> > > > want to change their sink connector implementations to support this
> new
> > > > feature would do the following, which would work in older and newer
> > Connect
> > > > runtimes:
> > > > ```
> > > > public void put(Collection records) {
> > > > put(records, null);
> > > > }
> > > > public void put(Collection records,
> > BiFunction > > > Throwable> reporter) {
> > > > // the normal `put(Collection)` logic goes here, but can
> > optionally
> > > > use `reporter` if non-null
> > > > }
> > > > ```
> > > >
> > > > I think this has all the same benefits of the current KIP, but
> > > > it's noticeably simpler and hopefully more aestheticall

Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-14 Thread Aakash Shah
r successfully
> sends the errant record to Kafka."
>
> This sentence is a bit difficult to understand, but IIUC this really just
> means that "accept(...)" will be synchronous and will block until the
> errant record has been successfully written to Kafka. If so, let's say
> that. The rest of the paragraph is fine.
>
> Finally, is this KIP proposing new metrics, or that existing metrics would
> be used to track the error reporter usage? If the former, then please
> fully-specify what these metrics will be, similarly to how metrics are
> specified in
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework
> .
>
> Thoughts?
>
> Best regards,
>
> Randall
>
> On Mon, May 11, 2020 at 4:49 PM Andrew Schofield <
> andrew_schofi...@live.com>
> wrote:
>
> > Hi Aakash,
> > Thanks for sorting out the replies to the mailing list.
> >
> > First, I do like the idea of improving error reporting in sink
> connectors.
> > I'd like a simple
> > way to put bad records onto the DLQ.
> >
> > I think this KIP is considerably more complicated than it seems. The
> > guidance on the
> > SinkTask.put() method is that it should send the records asynchronously
> > and immediately
> > return, so the task is likely to want to report errors asynchronously
> > too.  Currently the KIP
> > states that "the task can send errant records to it within put(...)" and
> > that's too restrictive.
> > The task ought to be able to report any unflushed records, but the
> > synchronisation of this is going
> > to be tricky. I suppose the connector author needs to make sure that all
> > errant records have
> > been reported before returning control from SinkTask.flush(...) or
> perhaps
> > SinkTask.preCommit(...).
> >
> > I think the interface is a little strange too. I can see that this was
> > done so it's possible to deliver a connector
> > that supports error reporting but it can also work in earlier versions of
> > the KC runtime. But, the
> > pattern so far is that the task uses the methods of SinkTaskContext to
> > access utilities in the Kafka
> > Connect runtime, and I suggest that reporting a bad record is such a
> > utility. SinkTaskContext has
> > changed before when the configs() methods was added, so I think there is
> > precedent for adding a method.
> > The way the KIP adds a method to SinkTask that the KC runtime calls to
> > provide the error reporting utility
> > seems not to match what has gone before.
> >
> > Thanks,
> > Andrew
> >
> > On 11/05/2020, 19:05, "Aakash Shah"  wrote:
> >
> > I wasn't previously added to the dev mailing list, so I'd like to
> post
> > my
> > discussion with Andrew Schofield below for visibility and further
> > discussion:
> >
> > Hi Andrew,
> >
> > Thanks for the reply. The main concern with this approach would be
> its
> > backward compatibility. I’ve highlighted the thoughts around the
> > backwards
> > compatibility of the initial approach, please let me know what you
> > think.
> >
> > Thanks,
> > Aakash
> >
> >
> >
> 
> >
> > Hi,
> > By adding a new method to the SinkContext interface in say Kafka
> 2.6, a
> > connector that calls it would require a Kafka 2.6 connect runtime. I
> > don't
> > quite see how that's a backward compatibility problem. It's just that
> > new
> > connectors need the latest interface. I might not quite be
> > understanding,
> > but I think it would be fine.
> >
> > Thanks,
> > Andrew
> >
> >
> >
> 
> >
> > Hi Andrew,
> >
> > I apologize for the way the reply was sent. I just subscribed to the
> > dev
> > mailing list so it should be resolved now.
> >
> > You are correct, new connectors would simply require the latest
> > interface.
> > However, we want to remove that requirement - in other words, we want
> > to
> > allow the possibility that someone wants the latest connector/to
> > upgrade to
> > the latest version, but deploys it on an older version of AK.
> > Basically, we
> > don't want to enforce the necessity of upgrading AK to get the latest
> > interface. In the current approach, there would be no issue of
> > deploying a
> > new connector on an older version of AK, as the Connect framework
> would
> > simply not invoke the new method.
> >
> > Please let me know what you think and if I need to clarify anything.
> >
> > Thanks,
> > Aakash
> >
> >
>


Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-14 Thread Aakash Shah
Hi Andrew,

Thanks for your comments.

Based on my understanding from
https://kafka.apache.org/25/javadoc/org/apache/kafka/connect/sink/SinkTask.html,
put(...) can be asynchronous but not necessarily should be, specifically
in: "As records are fetched from Kafka, they will be passed to the sink
task using the put(Collection) API, which should either write them to the
downstream system or batch them for later writing. Periodically, Connect
will call flush(Map) to ensure that batched records are actually pushed to
the downstream system."

With respect to sending errant records within put(...) being too
restrictive, as Randall pointed out, this was more of an attempt at
describing the basic behavior rather than requiring the reporter only be
called within put(...).

I see your point about the previous pattern of adding methods that the task
uses to SinkTaskContext. The main concerns I have with this procedure are:

1. The added effort for a connector developer that wants to use this
functionality in checking if this method exists and implementing this
across all connectors
2. Since the addition of that method two years ago, the Connect ecosystem
has grown a lot which causes other compatibility issues and inertia towards
upgrading their version of Connect

Please let me know what you think.

Thanks,
Aakash

On Mon, May 11, 2020 at 2:49 PM Andrew Schofield 
wrote:

> Hi Aakash,
> Thanks for sorting out the replies to the mailing list.
>
> First, I do like the idea of improving error reporting in sink connectors.
> I'd like a simple
> way to put bad records onto the DLQ.
>
> I think this KIP is considerably more complicated than it seems. The
> guidance on the
> SinkTask.put() method is that it should send the records asynchronously
> and immediately
> return, so the task is likely to want to report errors asynchronously
> too.  Currently the KIP
> states that "the task can send errant records to it within put(...)" and
> that's too restrictive.
> The task ought to be able to report any unflushed records, but the
> synchronisation of this is going
> to be tricky. I suppose the connector author needs to make sure that all
> errant records have
> been reported before returning control from SinkTask.flush(...) or perhaps
> SinkTask.preCommit(...).
>
> I think the interface is a little strange too. I can see that this was
> done so it's possible to deliver a connector
> that supports error reporting but it can also work in earlier versions of
> the KC runtime. But, the
> pattern so far is that the task uses the methods of SinkTaskContext to
> access utilities in the Kafka
> Connect runtime, and I suggest that reporting a bad record is such a
> utility. SinkTaskContext has
> changed before when the configs() methods was added, so I think there is
> precedent for adding a method.
> The way the KIP adds a method to SinkTask that the KC runtime calls to
> provide the error reporting utility
> seems not to match what has gone before.
>
> Thanks,
> Andrew
>
> On 11/05/2020, 19:05, "Aakash Shah"  wrote:
>
> I wasn't previously added to the dev mailing list, so I'd like to post
> my
> discussion with Andrew Schofield below for visibility and further
> discussion:
>
> Hi Andrew,
>
> Thanks for the reply. The main concern with this approach would be its
> backward compatibility. I’ve highlighted the thoughts around the
> backwards
> compatibility of the initial approach, please let me know what you
> think.
>
> Thanks,
> Aakash
>
>
> 
>
> Hi,
> By adding a new method to the SinkContext interface in say Kafka 2.6, a
> connector that calls it would require a Kafka 2.6 connect runtime. I
> don't
> quite see how that's a backward compatibility problem. It's just that
> new
> connectors need the latest interface. I might not quite be
> understanding,
> but I think it would be fine.
>
> Thanks,
> Andrew
>
>
> 
>
> Hi Andrew,
>
> I apologize for the way the reply was sent. I just subscribed to the
> dev
> mailing list so it should be resolved now.
>
> You are correct, new connectors would simply require the latest
> interface.
> However, we want to remove that requirement - in other words, we want
> to
> allow the possibility that someone wants the latest connector/to
> upgrade to
> the latest version, but deploys it on an older version of AK.
> Basically

Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-11 Thread Aakash Shah
Hi Chris,

Thanks for the feedback!

1. Great point, this is the more correct general aim of the proposal.

2. Thanks for the suggestions on points a and b, they are both great. I
will incorporate them.

3. Yep, I'll add this to the sample code and add an explanation.

4. Great point about the addition of the extra configuration properties. By
"If we decide to include these properties, we should also update the
"Synchrony" section to be agnostic about what the error reporter is doing
under the hood since there won't necessarily be a Kafka producer involved
in handling records given to the error reporter," are you referring to the
fact that if people only choose to enable logging and not sending to a DLQ,
there won't necessarily be a producer involved?

5. Yeah, this is correct; I'll update to correctly state this.

6. Thanks, I'll take a look into making the metrics section more robust.

7. Yep, I'll add an explanation to the "Rejected Alternatives."

Best,
Aakash


Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-11 Thread Aakash Shah
I wasn't previously added to the dev mailing list, so I'd like to post my
discussion with Andrew Schofield below for visibility and further
discussion:

Hi Andrew,

Thanks for the reply. The main concern with this approach would be its
backward compatibility. I’ve highlighted the thoughts around the backwards
compatibility of the initial approach, please let me know what you think.

Thanks,
Aakash



Hi,
By adding a new method to the SinkContext interface in say Kafka 2.6, a
connector that calls it would require a Kafka 2.6 connect runtime. I don't
quite see how that's a backward compatibility problem. It's just that new
connectors need the latest interface. I might not quite be understanding,
but I think it would be fine.

Thanks,
Andrew



Hi Andrew,

I apologize for the way the reply was sent. I just subscribed to the dev
mailing list so it should be resolved now.

You are correct, new connectors would simply require the latest interface.
However, we want to remove that requirement - in other words, we want to
allow the possibility that someone wants the latest connector/to upgrade to
the latest version, but deploys it on an older version of AK. Basically, we
don't want to enforce the necessity of upgrading AK to get the latest
interface. In the current approach, there would be no issue of deploying a
new connector on an older version of AK, as the Connect framework would
simply not invoke the new method.

Please let me know what you think and if I need to clarify anything.

Thanks,
Aakash


[jira] [Created] (KAFKA-9971) Error Reporting in Sink Connectors

2020-05-07 Thread Aakash Shah (Jira)
Aakash Shah created KAFKA-9971:
--

 Summary: Error Reporting in Sink Connectors
 Key: KAFKA-9971
 URL: https://issues.apache.org/jira/browse/KAFKA-9971
 Project: Kafka
  Issue Type: New Feature
  Components: KafkaConnect
Affects Versions: 2.6.0
Reporter: Aakash Shah


Currently, 
[KIP-298|https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect]
 provides error handling in Kafka Connect that includes functionality such as 
retrying, logging, and sending errant records to a dead letter queue. However, 
the dead letter queue functionality from KIP-298 only supports error reporting 
within contexts of the transform operation, and key, value, and header 
converter operation. Within the context of the {{put(...)}} method in sink 
connectors, there is no support for dead letter queue/error reporting 
functionality. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-07 Thread Aakash Shah
Hello all,

I've created a KIP to handle error reporting for records in sink
connectors, specifically within the context of put(...):

https://cwiki.apache.org/confluence/display/KAFKA/KIP-610%3A+Error+Reporting+in+Sink+Connectors

I would appreciate any kind of feedback.

Thanks,

Aakash


Permission to create a KIP

2020-05-02 Thread Aakash Shah
Hello,

I would like to request permission to create a KIP.

My Wiki ID is aakash33 and my email is as...@confluent.io.

Thank you!

Best,

Aakash Shah