Re: [DISCUSS] KIP-883: Add delete callback method to Connector API

2022-12-05 Thread Chris Egerton
Hi Hector,

Thanks for the updates!

RE 1: This doesn't have the same user-facing behavior, though. Today
failures in Connector::stop can be surfaced via the status endpoints in the
REST API. But when connectors are deleted, they won't be visible at all in
these endpoints.

RE 3: It seems like this approach would only provide guarantees on a
per-worker basis; I was wondering more about trying to wait for tasks on
all workers in the cluster to stop. One potential approach for this could
be to handle connector deletes by breaking them up into two separate
rebalances: the first would revoke all of the connector's tasks, and the
second would revoke the connector itself. But this is just an example, and
would come with its own non-negligible overhead; we can and should explore
other approaches too.

RE 4: Thanks for the clarification, makes sense 👍 Worth noting for anyone
following along that KIP-419 is similar to this KIP, but on a much smaller
scale: it only focuses on the cleanup of resources allocated by single Task
instances, whereas this KIP focuses on the cleanup of resources that are
meant to be used across the entire lifetime of the connector, which may
encompass the lifetimes of several Connector and Task instances.

Cheers,

Chris

On Wed, Nov 30, 2022 at 10:58 AM Hector Geraldino (BLOOMBERG/ 919 3RD A) <
hgerald...@bloomberg.net> wrote:

> Thanks for your feedback Chris,
>
> 1. I think the behavior should remain the same as it is today. The worker
> stops the connector when its configuration is updated, and if the update is
> a deletion, it won't start the connector again. If an error happens during
> stop() today, the statusListener will update the backing store with a
> FAILED state. The only thing that changes on this path is that the
> Connector#stop() method will include an additional boolean parameter, so
> the connector knows that the reason it is being stopped is because of a
> deletion, and can perform additional actions if necessary.
>
> 2. I agree; at first I thought it made sense, but after reading KIP-875
> and finding out that connectors can use custom offsets topics to store
> offsets, I think this idea needs more refinement. There's probably a way to
> reuse the work proposed by this KIP with the "Automatically delete offsets
> with connectors" feature mentioned on the "Future work" section of KIP-875,
> and am happy to explore it more.
>
> 3. I didn't consider that. There is some asymmetry here on how the
> StandaloneHerder handles this (tasks are stopped before the connector is)
> and the DistributedHerder. One option would be not to handle this on the
> #processConnectorConfigUpdates(...) method, but instead wait for the
> RebalanceListener#onRevoked(...) callback, which already stops the revoked
> connectors and tasks synchronously. The idea would be to enhance this to
> check the configState store and, if the configuration of the revoked
> connector(s) is gone, then we can let the connector know about that fact
> when stopping it (by the aforementioned mechanism). I'll update the KIP and
> PR if you think it is worth it.
>
> 4. That's correct. As the KIP motivates, we have connectors that need to
> do some provisioning/setup when they are deployed (we run connectors for
> internal clients), and when tenants delete a connector, we don't have a
> clear signal that allows us to cleanup those resources. The goal is
> probably similar to
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-419%3A+Safely+notify+Kafka+Connect+SourceTask+is+stopped,
> just took a different approach.
>
>
> From: dev@kafka.apache.org At: 11/29/22 15:31:31 UTC-5:00To:
> dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-883: Add delete callback method to Connector API
>
> Hi Hector,
>
> Thanks for the KIP! Here are my initial thoughts:
>
> 1. I like the simplicity of an overloaded stop method, but there is some
> asymmetry between stopping a connector and deleting one. If a connector is
> stopped (for rebalance, to be reconfigured, etc.) and a failure occurs
> then, the failure will be clearly visible in the REST API via, e.g., the
> GET /connectors/{connector}/status endpoint. If a connector is deleted and
> a failure occurs, with the current proposal, users won't have the same
> level of visibility. How can we clearly surface failures caused during the
> "destroy" phase of a connector's lifecycle to users?
>
> 2. I don't think that this new feature should be used to control (delete)
> offsets for connectors. We're addressing that separately in KIP-875, and it
> could be a source of headaches for users if they discover that some
> connectors' offsets persist across deletion/recreation while others do not.
> If an

Re: [DISCUSS] KIP-883: Add delete callback method to Connector API

2022-11-30 Thread Hector Geraldino (BLOOMBERG/ 919 3RD A)
Thanks for your feedback Chris,

1. I think the behavior should remain the same as it is today. The worker stops 
the connector when its configuration is updated, and if the update is a 
deletion, it won't start the connector again. If an error happens during stop() 
today, the statusListener will update the backing store with a FAILED state. 
The only thing that changes on this path is that the Connector#stop() method 
will include an additional boolean parameter, so the connector knows that the 
reason it is being stopped is because of a deletion, and can perform additional 
actions if necessary. 

2. I agree; at first I thought it made sense, but after reading KIP-875 and 
finding out that connectors can use custom offsets topics to store offsets, I 
think this idea needs more refinement. There's probably a way to reuse the work 
proposed by this KIP with the "Automatically delete offsets with connectors" 
feature mentioned on the "Future work" section of KIP-875, and am happy to 
explore it more.

3. I didn't consider that. There is some asymmetry here on how the 
StandaloneHerder handles this (tasks are stopped before the connector is) and 
the DistributedHerder. One option would be not to handle this on the 
#processConnectorConfigUpdates(...) method, but instead wait for the 
RebalanceListener#onRevoked(...) callback, which already stops the revoked 
connectors and tasks synchronously. The idea would be to enhance this to check 
the configState store and, if the configuration of the revoked connector(s) is 
gone, then we can let the connector know about that fact when stopping it (by 
the aforementioned mechanism). I'll update the KIP and PR if you think it is 
worth it.

4. That's correct. As the KIP motivates, we have connectors that need to do 
some provisioning/setup when they are deployed (we run connectors for internal 
clients), and when tenants delete a connector, we don't have a clear signal 
that allows us to cleanup those resources. The goal is probably similar to 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-419%3A+Safely+notify+Kafka+Connect+SourceTask+is+stopped,
 just took a different approach.


From: dev@kafka.apache.org At: 11/29/22 15:31:31 UTC-5:00To:  
dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-883: Add delete callback method to Connector API

Hi Hector,

Thanks for the KIP! Here are my initial thoughts:

1. I like the simplicity of an overloaded stop method, but there is some
asymmetry between stopping a connector and deleting one. If a connector is
stopped (for rebalance, to be reconfigured, etc.) and a failure occurs
then, the failure will be clearly visible in the REST API via, e.g., the
GET /connectors/{connector}/status endpoint. If a connector is deleted and
a failure occurs, with the current proposal, users won't have the same
level of visibility. How can we clearly surface failures caused during the
"destroy" phase of a connector's lifecycle to users?

2. I don't think that this new feature should be used to control (delete)
offsets for connectors. We're addressing that separately in KIP-875, and it
could be a source of headaches for users if they discover that some
connectors' offsets persist across deletion/recreation while others do not.
If anything, we should explicitly recommend against this kind of logic in
the Javadocs for the newly-introduced method.

3. Is it worth trying to give all of the connector's tasks a chance to shut
down before invoking "stop(true)" on the Connector? If so, any thoughts on
how we can accomplish that?

4. Just to make sure we're on the same page--this feature is not being
proposed so that connectors can try to delete the data that they've
produced (i.e., that sink connectors have written to the external system,
or that source connectors have written to Kafka), right?

Cheers,

Chris

On Thu, Nov 17, 2022 at 5:31 PM Hector Geraldino (BLOOMBERG/ 919 3RD A) <
hgerald...@bloomberg.net> wrote:

> Hi,
>
> I've updated the KIP with the new #stop(boolean isDeleted) overloaded
> method, and have also amended the PR and JIRA tickets. I also added a
> couple entries to the "Rejected alternatives" section with the reasons why
> I pivoted from introducing new callback methods to retrofit the existing
> one.
>
> Please let me know what your thoughts are.
>
> Cheers,
> Hector
>
> From: Hector Geraldino (BLOOMBERG/ 919 3RD A) At: 11/16/22 17:38:59
> UTC-5:00To:  dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-883: Add delete callback method to Connector API
>
> Hi Mickael,
>
> I agree that the new STOPPED state proposed in KIP-875 will improve the
> connector lifecycle. The changes proposed in this KIP aim to cover the gap
> where connectors need to actually be deleted, but because the API doesn't
> provide any hooks, external asse

Re: [DISCUSS] KIP-883: Add delete callback method to Connector API

2022-11-29 Thread Chris Egerton
Hi Hector,

Thanks for the KIP! Here are my initial thoughts:

1. I like the simplicity of an overloaded stop method, but there is some
asymmetry between stopping a connector and deleting one. If a connector is
stopped (for rebalance, to be reconfigured, etc.) and a failure occurs
then, the failure will be clearly visible in the REST API via, e.g., the
GET /connectors/{connector}/status endpoint. If a connector is deleted and
a failure occurs, with the current proposal, users won't have the same
level of visibility. How can we clearly surface failures caused during the
"destroy" phase of a connector's lifecycle to users?

2. I don't think that this new feature should be used to control (delete)
offsets for connectors. We're addressing that separately in KIP-875, and it
could be a source of headaches for users if they discover that some
connectors' offsets persist across deletion/recreation while others do not.
If anything, we should explicitly recommend against this kind of logic in
the Javadocs for the newly-introduced method.

3. Is it worth trying to give all of the connector's tasks a chance to shut
down before invoking "stop(true)" on the Connector? If so, any thoughts on
how we can accomplish that?

4. Just to make sure we're on the same page--this feature is not being
proposed so that connectors can try to delete the data that they've
produced (i.e., that sink connectors have written to the external system,
or that source connectors have written to Kafka), right?

Cheers,

Chris

On Thu, Nov 17, 2022 at 5:31 PM Hector Geraldino (BLOOMBERG/ 919 3RD A) <
hgerald...@bloomberg.net> wrote:

> Hi,
>
> I've updated the KIP with the new #stop(boolean isDeleted) overloaded
> method, and have also amended the PR and JIRA tickets. I also added a
> couple entries to the "Rejected alternatives" section with the reasons why
> I pivoted from introducing new callback methods to retrofit the existing
> one.
>
> Please let me know what your thoughts are.
>
> Cheers,
> Hector
>
> From: Hector Geraldino (BLOOMBERG/ 919 3RD A) At: 11/16/22 17:38:59
> UTC-5:00To:  dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-883: Add delete callback method to Connector API
>
> Hi Mickael,
>
> I agree that the new STOPPED state proposed in KIP-875 will improve the
> connector lifecycle. The changes proposed in this KIP aim to cover the gap
> where connectors need to actually be deleted, but because the API doesn't
> provide any hooks, external assets are left lingering where they shouldn't.
>
> I agree that this proposal is similar to KIP-419, maybe the main
> difference is their focus on Tasks whereas KIP-833 proposes changes to the
> Connector. My goal is to figure out the correct semantics for notifying
> connectors that they're being stopped because the connector has been
> deleted.
>
> Now, computing the "deleted" state in both the Standalone and Distributed
> herders is not hard, so the question is: when shall the connector be
> notified? The "easiest" option would be to do it by calling an overloaded
> Connector#stop(deleted) method, but there are other - more expressive -
> ways, like providing an 'onDelete()' or 'destroy()' method.
>
> I'm leaning towards adding an overload method (less complexity, known
> corner cases), and will amend the KIP with the reasoning behind that
> decision soon.
>
> Thanks for your feedback!
>
> From: dev@kafka.apache.org At: 11/16/22 11:13:17 UTC-5:00To:
> dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-883: Add delete callback method to Connector API
>
> Hi Hector,
>
> Thanks for the KIP.
>
> One tricky aspect is that currently there's no real way to stop a
> connector so to do so people often just delete them temporarily.
> KIP-875 proposes adding a mechanism to properly stop connectors which
> should reduce the need to deleting them and avoid doing potentially
> expensive cleanup operations repetitively.
>
> This KIP also reminds me of KIP-419:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-419%3A+Safely+notify+Kafka
> +Connect+SourceTask+is+stopped.
> Is it guaranteed the new delete callback will be the last method
> called?
>
> Thanks,
> Mickael
>
>
> On Tue, Nov 15, 2022 at 5:40 PM Sagar  wrote:
> >
> > Hey Hector,
> >
> > Thanks for the KIP. I have a minor suggestion in terms of naming. Since
> > this is a callback method, would it make sense to call it onDelete()?
> >
> > Also, the failure scenarios discussed by Greg would need handling. Among
> > other things, I like the idea of having a timeout for graceful shutdown
> or
> > else try a force shutdown. What do you think

Re: [DISCUSS] KIP-883: Add delete callback method to Connector API

2022-11-17 Thread Hector Geraldino (BLOOMBERG/ 919 3RD A)
Hi,

I've updated the KIP with the new #stop(boolean isDeleted) overloaded method, 
and have also amended the PR and JIRA tickets. I also added a couple entries to 
the "Rejected alternatives" section with the reasons why I pivoted from 
introducing new callback methods to retrofit the existing one.

Please let me know what your thoughts are.

Cheers,
Hector 

From: Hector Geraldino (BLOOMBERG/ 919 3RD A) At: 11/16/22 17:38:59 UTC-5:00To: 
 dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-883: Add delete callback method to Connector API

Hi Mickael,

I agree that the new STOPPED state proposed in KIP-875 will improve the 
connector lifecycle. The changes proposed in this KIP aim to cover the gap 
where connectors need to actually be deleted, but because the API doesn't 
provide any hooks, external assets are left lingering where they shouldn't.

I agree that this proposal is similar to KIP-419, maybe the main difference is 
their focus on Tasks whereas KIP-833 proposes changes to the Connector. My goal 
is to figure out the correct semantics for notifying connectors that they're 
being stopped because the connector has been deleted. 

Now, computing the "deleted" state in both the Standalone and Distributed 
herders is not hard, so the question is: when shall the connector be notified? 
The "easiest" option would be to do it by calling an overloaded 
Connector#stop(deleted) method, but there are other - more expressive - ways, 
like providing an 'onDelete()' or 'destroy()' method. 

I'm leaning towards adding an overload method (less complexity, known corner 
cases), and will amend the KIP with the reasoning behind that decision soon.

Thanks for your feedback! 

From: dev@kafka.apache.org At: 11/16/22 11:13:17 UTC-5:00To:  
dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-883: Add delete callback method to Connector API

Hi Hector,

Thanks for the KIP.

One tricky aspect is that currently there's no real way to stop a
connector so to do so people often just delete them temporarily.
KIP-875 proposes adding a mechanism to properly stop connectors which
should reduce the need to deleting them and avoid doing potentially
expensive cleanup operations repetitively.

This KIP also reminds me of KIP-419:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-419%3A+Safely+notify+Kafka
+Connect+SourceTask+is+stopped.
Is it guaranteed the new delete callback will be the last method
called?

Thanks,
Mickael


On Tue, Nov 15, 2022 at 5:40 PM Sagar  wrote:
>
> Hey Hector,
>
> Thanks for the KIP. I have a minor suggestion in terms of naming. Since
> this is a callback method, would it make sense to call it onDelete()?
>
> Also, the failure scenarios discussed by Greg would need handling. Among
> other things, I like the idea of having a timeout for graceful shutdown or
> else try a force shutdown. What do you think about that approach?
>
> Thanks!
> Sagar.
>
> On Sat, Nov 12, 2022 at 1:53 AM Hector Geraldino (BLOOMBERG/ 919 3RD A) <
> hgerald...@bloomberg.net> wrote:
>
> > Thanks Greg for taking your time to review not just the KIP but also the
> > PR.
> >
> > 1. You made very valid points regarding the behavior of the destroy()
> > callback for connectors that don't follow the happy path. After thinking
> > about it, I decided to tweak the implementation a bit and have the
> > destroy() method be called during the worker shutdown: this means it will
> > share the same guarantees the connector#stop() method has. An alternative
> > implementation can be to have an overloaded connector#stop(boolean deleted)
> > method that signals a connector that it is being stopped due to deletion,
> > but I think that having a separate destroy() method provides clearer
> > semantics.
> >
> > I'll make sure to ammend the KIP with these details.
> >
> > 3. Without going too deep on the types of operations that can be performed
> > by a connector when it's being deleted, I can imagine the
> > org.apache.kafka.connect.source.SourceConnector base class having a default
> > implementation that deletes the connector's offsets automatically
> > (controlled by a property); this is in the context of KIP-875 (first-class
> > offsets support in Kafka Connect). Similar behaviors can be introduced for
> > the SinkConnector, however I'm not sure if this KIP is the right place to
> > discuss all the possibilities, or if we shoold keeping it more
> > narrow-focused on  providing a callback mechanism for when connectors are
> > deleted, and what the expectations are around this newly introduced method.
> > What do you think?
> >
> >
> > From: dev@kafka.apache.org At: 11/09/22 16:55:04 UTC-5:00To:
> > dev@kafka.apache.org

Re: [DISCUSS] KIP-883: Add delete callback method to Connector API

2022-11-16 Thread Hector Geraldino (BLOOMBERG/ 919 3RD A)
Hi Mickael,

I agree that the new STOPPED state proposed in KIP-875 will improve the 
connector lifecycle. The changes proposed in this KIP aim to cover the gap 
where connectors need to actually be deleted, but because the API doesn't 
provide any hooks, external assets are left lingering where they shouldn't.

I agree that this proposal is similar to KIP-419, maybe the main difference is 
their focus on Tasks whereas KIP-833 proposes changes to the Connector. My goal 
is to figure out the correct semantics for notifying connectors that they're 
being stopped because the connector has been deleted. 

Now, computing the "deleted" state in both the Standalone and Distributed 
herders is not hard, so the question is: when shall the connector be notified? 
The "easiest" option would be to do it by calling an overloaded 
Connector#stop(deleted) method, but there are other - more expressive - ways, 
like providing an 'onDelete()' or 'destroy()' method. 

I'm leaning towards adding an overload method (less complexity, known corner 
cases), and will amend the KIP with the reasoning behind that decision soon.

Thanks for your feedback! 

From: dev@kafka.apache.org At: 11/16/22 11:13:17 UTC-5:00To:  
dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-883: Add delete callback method to Connector API

Hi Hector,

Thanks for the KIP.

One tricky aspect is that currently there's no real way to stop a
connector so to do so people often just delete them temporarily.
KIP-875 proposes adding a mechanism to properly stop connectors which
should reduce the need to deleting them and avoid doing potentially
expensive cleanup operations repetitively.

This KIP also reminds me of KIP-419:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-419%3A+Safely+notify+Kafka
+Connect+SourceTask+is+stopped.
Is it guaranteed the new delete callback will be the last method
called?

Thanks,
Mickael


On Tue, Nov 15, 2022 at 5:40 PM Sagar  wrote:
>
> Hey Hector,
>
> Thanks for the KIP. I have a minor suggestion in terms of naming. Since
> this is a callback method, would it make sense to call it onDelete()?
>
> Also, the failure scenarios discussed by Greg would need handling. Among
> other things, I like the idea of having a timeout for graceful shutdown or
> else try a force shutdown. What do you think about that approach?
>
> Thanks!
> Sagar.
>
> On Sat, Nov 12, 2022 at 1:53 AM Hector Geraldino (BLOOMBERG/ 919 3RD A) <
> hgerald...@bloomberg.net> wrote:
>
> > Thanks Greg for taking your time to review not just the KIP but also the
> > PR.
> >
> > 1. You made very valid points regarding the behavior of the destroy()
> > callback for connectors that don't follow the happy path. After thinking
> > about it, I decided to tweak the implementation a bit and have the
> > destroy() method be called during the worker shutdown: this means it will
> > share the same guarantees the connector#stop() method has. An alternative
> > implementation can be to have an overloaded connector#stop(boolean deleted)
> > method that signals a connector that it is being stopped due to deletion,
> > but I think that having a separate destroy() method provides clearer
> > semantics.
> >
> > I'll make sure to ammend the KIP with these details.
> >
> > 3. Without going too deep on the types of operations that can be performed
> > by a connector when it's being deleted, I can imagine the
> > org.apache.kafka.connect.source.SourceConnector base class having a default
> > implementation that deletes the connector's offsets automatically
> > (controlled by a property); this is in the context of KIP-875 (first-class
> > offsets support in Kafka Connect). Similar behaviors can be introduced for
> > the SinkConnector, however I'm not sure if this KIP is the right place to
> > discuss all the possibilities, or if we shoold keeping it more
> > narrow-focused on  providing a callback mechanism for when connectors are
> > deleted, and what the expectations are around this newly introduced method.
> > What do you think?
> >
> >
> > From: dev@kafka.apache.org At: 11/09/22 16:55:04 UTC-5:00To:
> > dev@kafka.apache.org
> > Subject: Re: [DISCUSS] KIP-883: Add delete callback method to Connector API
> >
> > Hi Hector,
> >
> > Thanks for the KIP!
> >
> > This is certainly missing functionality from the native Connect framework,
> > and we should try to make it possible to inform connectors about this part
> > of their lifecycle.
> > However, as with most functionality that was left out of the initial
> > implementation of the framework, the details are more challenging to work
> > out.
> >
>

Re: [DISCUSS] KIP-883: Add delete callback method to Connector API

2022-11-16 Thread Hector Geraldino (BLOOMBERG/ 919 3RD A)
Hi Sagar,

Thanks for your feedback! I actually renamed the method from "deleted()" to 
"destroyed()", which I think conveys the intention more clearly. I can 
certainly rename it to be 'onDeleted()', although I feel any method named 
onXXX() belongs to a listener class :)

Regarding failure scenarios, an option I'm considering is to just provide an 
overloaded Connector#stop(boolean deleted) method that is called during 
WorkerConnector#doShutdown(). This has the advantage of providing the same 
semantics that the current Connector#stop() has, with the caveat that the API 
won't be as expressive. Also, the extra 'cleanup' bits that were supposed to 
happen when a connector is deleted might not to happen at all if the connector 
doesn't stop before the configured timeout (and is therefore cancelled).

At this point I think the simplest option would be to provide an overloaded 
method (with a default implementation) that connectors can override. Wdyt?

From: dev@kafka.apache.org At: 11/15/22 11:40:26 UTC-5:00To:  
dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-883: Add delete callback method to Connector API

Hey Hector,

Thanks for the KIP. I have a minor suggestion in terms of naming. Since
this is a callback method, would it make sense to call it onDelete()?

Also, the failure scenarios discussed by Greg would need handling. Among
other things, I like the idea of having a timeout for graceful shutdown or
else try a force shutdown. What do you think about that approach?

Thanks!
Sagar.

On Sat, Nov 12, 2022 at 1:53 AM Hector Geraldino (BLOOMBERG/ 919 3RD A) <
hgerald...@bloomberg.net> wrote:

> Thanks Greg for taking your time to review not just the KIP but also the
> PR.
>
> 1. You made very valid points regarding the behavior of the destroy()
> callback for connectors that don't follow the happy path. After thinking
> about it, I decided to tweak the implementation a bit and have the
> destroy() method be called during the worker shutdown: this means it will
> share the same guarantees the connector#stop() method has. An alternative
> implementation can be to have an overloaded connector#stop(boolean deleted)
> method that signals a connector that it is being stopped due to deletion,
> but I think that having a separate destroy() method provides clearer
> semantics.
>
> I'll make sure to ammend the KIP with these details.
>
> 3. Without going too deep on the types of operations that can be performed
> by a connector when it's being deleted, I can imagine the
> org.apache.kafka.connect.source.SourceConnector base class having a default
> implementation that deletes the connector's offsets automatically
> (controlled by a property); this is in the context of KIP-875 (first-class
> offsets support in Kafka Connect). Similar behaviors can be introduced for
> the SinkConnector, however I'm not sure if this KIP is the right place to
> discuss all the possibilities, or if we shoold keeping it more
> narrow-focused on  providing a callback mechanism for when connectors are
> deleted, and what the expectations are around this newly introduced method.
> What do you think?
>
>
> From: dev@kafka.apache.org At: 11/09/22 16:55:04 UTC-5:00To:
> dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-883: Add delete callback method to Connector API
>
> Hi Hector,
>
> Thanks for the KIP!
>
> This is certainly missing functionality from the native Connect framework,
> and we should try to make it possible to inform connectors about this part
> of their lifecycle.
> However, as with most functionality that was left out of the initial
> implementation of the framework, the details are more challenging to work
> out.
>
> 1. What happens when the destroy call throws an error, how does the
> framework respond?
>
> This is unspecified in the KIP, and it appears that your proposed changes
> could cause the herder to fail.
> From the perspective of operators & connector developers, what is a
> reasonable expectation to have for failure of a destroy?
> I could see operators wanting both a graceful-delete to make use of this
> new feature, and a force-delete for when the graceful-delete fails.
> A connector developer could choose to swallow all errors encountered, or
> fail-fast to indicate to the operator that there is an issue with the
> graceful-delete flow.
> If the alternative is crashing the herder, connector developers may choose
> to hide serious errors, which is undesirable.
>
> 2. What happens when the destroy() call takes a long time to complete, or
> is interrupted?
>
> It appears that your implementation serially destroy()s each appropriate
> connector, and may prevent the herder thread from making progress while the
> operation is ongoing

Re: [DISCUSS] KIP-883: Add delete callback method to Connector API

2022-11-16 Thread Mickael Maison
Hi Hector,

Thanks for the KIP.

One tricky aspect is that currently there's no real way to stop a
connector so to do so people often just delete them temporarily.
KIP-875 proposes adding a mechanism to properly stop connectors which
should reduce the need to deleting them and avoid doing potentially
expensive cleanup operations repetitively.

This KIP also reminds me of KIP-419:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-419%3A+Safely+notify+Kafka+Connect+SourceTask+is+stopped.
Is it guaranteed the new delete callback will be the last method
called?

Thanks,
Mickael


On Tue, Nov 15, 2022 at 5:40 PM Sagar  wrote:
>
> Hey Hector,
>
> Thanks for the KIP. I have a minor suggestion in terms of naming. Since
> this is a callback method, would it make sense to call it onDelete()?
>
> Also, the failure scenarios discussed by Greg would need handling. Among
> other things, I like the idea of having a timeout for graceful shutdown or
> else try a force shutdown. What do you think about that approach?
>
> Thanks!
> Sagar.
>
> On Sat, Nov 12, 2022 at 1:53 AM Hector Geraldino (BLOOMBERG/ 919 3RD A) <
> hgerald...@bloomberg.net> wrote:
>
> > Thanks Greg for taking your time to review not just the KIP but also the
> > PR.
> >
> > 1. You made very valid points regarding the behavior of the destroy()
> > callback for connectors that don't follow the happy path. After thinking
> > about it, I decided to tweak the implementation a bit and have the
> > destroy() method be called during the worker shutdown: this means it will
> > share the same guarantees the connector#stop() method has. An alternative
> > implementation can be to have an overloaded connector#stop(boolean deleted)
> > method that signals a connector that it is being stopped due to deletion,
> > but I think that having a separate destroy() method provides clearer
> > semantics.
> >
> > I'll make sure to ammend the KIP with these details.
> >
> > 3. Without going too deep on the types of operations that can be performed
> > by a connector when it's being deleted, I can imagine the
> > org.apache.kafka.connect.source.SourceConnector base class having a default
> > implementation that deletes the connector's offsets automatically
> > (controlled by a property); this is in the context of KIP-875 (first-class
> > offsets support in Kafka Connect). Similar behaviors can be introduced for
> > the SinkConnector, however I'm not sure if this KIP is the right place to
> > discuss all the possibilities, or if we shoold keeping it more
> > narrow-focused on  providing a callback mechanism for when connectors are
> > deleted, and what the expectations are around this newly introduced method.
> > What do you think?
> >
> >
> > From: dev@kafka.apache.org At: 11/09/22 16:55:04 UTC-5:00To:
> > dev@kafka.apache.org
> > Subject: Re: [DISCUSS] KIP-883: Add delete callback method to Connector API
> >
> > Hi Hector,
> >
> > Thanks for the KIP!
> >
> > This is certainly missing functionality from the native Connect framework,
> > and we should try to make it possible to inform connectors about this part
> > of their lifecycle.
> > However, as with most functionality that was left out of the initial
> > implementation of the framework, the details are more challenging to work
> > out.
> >
> > 1. What happens when the destroy call throws an error, how does the
> > framework respond?
> >
> > This is unspecified in the KIP, and it appears that your proposed changes
> > could cause the herder to fail.
> > From the perspective of operators & connector developers, what is a
> > reasonable expectation to have for failure of a destroy?
> > I could see operators wanting both a graceful-delete to make use of this
> > new feature, and a force-delete for when the graceful-delete fails.
> > A connector developer could choose to swallow all errors encountered, or
> > fail-fast to indicate to the operator that there is an issue with the
> > graceful-delete flow.
> > If the alternative is crashing the herder, connector developers may choose
> > to hide serious errors, which is undesirable.
> >
> > 2. What happens when the destroy() call takes a long time to complete, or
> > is interrupted?
> >
> > It appears that your implementation serially destroy()s each appropriate
> > connector, and may prevent the herder thread from making progress while the
> > operation is ongoing.
> > We have previously had to patch Connect to perform all connector and task
> > operations on a background thread, because some con

Re: [DISCUSS] KIP-883: Add delete callback method to Connector API

2022-11-15 Thread Sagar
Hey Hector,

Thanks for the KIP. I have a minor suggestion in terms of naming. Since
this is a callback method, would it make sense to call it onDelete()?

Also, the failure scenarios discussed by Greg would need handling. Among
other things, I like the idea of having a timeout for graceful shutdown or
else try a force shutdown. What do you think about that approach?

Thanks!
Sagar.

On Sat, Nov 12, 2022 at 1:53 AM Hector Geraldino (BLOOMBERG/ 919 3RD A) <
hgerald...@bloomberg.net> wrote:

> Thanks Greg for taking your time to review not just the KIP but also the
> PR.
>
> 1. You made very valid points regarding the behavior of the destroy()
> callback for connectors that don't follow the happy path. After thinking
> about it, I decided to tweak the implementation a bit and have the
> destroy() method be called during the worker shutdown: this means it will
> share the same guarantees the connector#stop() method has. An alternative
> implementation can be to have an overloaded connector#stop(boolean deleted)
> method that signals a connector that it is being stopped due to deletion,
> but I think that having a separate destroy() method provides clearer
> semantics.
>
> I'll make sure to ammend the KIP with these details.
>
> 3. Without going too deep on the types of operations that can be performed
> by a connector when it's being deleted, I can imagine the
> org.apache.kafka.connect.source.SourceConnector base class having a default
> implementation that deletes the connector's offsets automatically
> (controlled by a property); this is in the context of KIP-875 (first-class
> offsets support in Kafka Connect). Similar behaviors can be introduced for
> the SinkConnector, however I'm not sure if this KIP is the right place to
> discuss all the possibilities, or if we shoold keeping it more
> narrow-focused on  providing a callback mechanism for when connectors are
> deleted, and what the expectations are around this newly introduced method.
> What do you think?
>
>
> From: dev@kafka.apache.org At: 11/09/22 16:55:04 UTC-5:00To:
> dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-883: Add delete callback method to Connector API
>
> Hi Hector,
>
> Thanks for the KIP!
>
> This is certainly missing functionality from the native Connect framework,
> and we should try to make it possible to inform connectors about this part
> of their lifecycle.
> However, as with most functionality that was left out of the initial
> implementation of the framework, the details are more challenging to work
> out.
>
> 1. What happens when the destroy call throws an error, how does the
> framework respond?
>
> This is unspecified in the KIP, and it appears that your proposed changes
> could cause the herder to fail.
> From the perspective of operators & connector developers, what is a
> reasonable expectation to have for failure of a destroy?
> I could see operators wanting both a graceful-delete to make use of this
> new feature, and a force-delete for when the graceful-delete fails.
> A connector developer could choose to swallow all errors encountered, or
> fail-fast to indicate to the operator that there is an issue with the
> graceful-delete flow.
> If the alternative is crashing the herder, connector developers may choose
> to hide serious errors, which is undesirable.
>
> 2. What happens when the destroy() call takes a long time to complete, or
> is interrupted?
>
> It appears that your implementation serially destroy()s each appropriate
> connector, and may prevent the herder thread from making progress while the
> operation is ongoing.
> We have previously had to patch Connect to perform all connector and task
> operations on a background thread, because some connector method
> implementations can stall indefinitely.
> Connect also has the notion of "cancelling" a connector/task if a graceful
> shutdown timeout operation takes too long. Perhaps some of that design or
> machinery may be useful to protect this method call as well.
>
> More specific to the destroy() call itself, what happens when a connector
> completes part of a destroy operation and then cannot complete the
> remainder, either due to timing out or a worker crashing?
> What is the contract with the connector developer about this method? Is the
> destroy() only started exactly once during the lifetime of the connector,
> or may it be retried?
>
> 3. What should be considered a reasonable custom implementation of the
> destroy() call? What resources should it clean up by default?
>
> I think we can broadly categorize the state a connector mutates among the
> following
> * Framework-managed state (e.g. source offsets, consumer offsets)
> * Implementation detail 

Re: [DISCUSS] KIP-883: Add delete callback method to Connector API

2022-11-11 Thread Hector Geraldino (BLOOMBERG/ 919 3RD A)
Thanks Greg for taking your time to review not just the KIP but also the PR. 

1. You made very valid points regarding the behavior of the destroy() callback 
for connectors that don't follow the happy path. After thinking about it, I 
decided to tweak the implementation a bit and have the destroy() method be 
called during the worker shutdown: this means it will share the same guarantees 
the connector#stop() method has. An alternative implementation can be to have 
an overloaded connector#stop(boolean deleted) method that signals a connector 
that it is being stopped due to deletion, but I think that having a separate 
destroy() method provides clearer semantics.

I'll make sure to ammend the KIP with these details.

3. Without going too deep on the types of operations that can be performed by a 
connector when it's being deleted, I can imagine the 
org.apache.kafka.connect.source.SourceConnector base class having a default 
implementation that deletes the connector's offsets automatically (controlled 
by a property); this is in the context of KIP-875 (first-class offsets support 
in Kafka Connect). Similar behaviors can be introduced for the SinkConnector, 
however I'm not sure if this KIP is the right place to discuss all the 
possibilities, or if we shoold keeping it more narrow-focused on  providing a 
callback mechanism for when connectors are deleted, and what the expectations 
are around this newly introduced method. What do you think?


From: dev@kafka.apache.org At: 11/09/22 16:55:04 UTC-5:00To:  
dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-883: Add delete callback method to Connector API

Hi Hector,

Thanks for the KIP!

This is certainly missing functionality from the native Connect framework,
and we should try to make it possible to inform connectors about this part
of their lifecycle.
However, as with most functionality that was left out of the initial
implementation of the framework, the details are more challenging to work
out.

1. What happens when the destroy call throws an error, how does the
framework respond?

This is unspecified in the KIP, and it appears that your proposed changes
could cause the herder to fail.
From the perspective of operators & connector developers, what is a
reasonable expectation to have for failure of a destroy?
I could see operators wanting both a graceful-delete to make use of this
new feature, and a force-delete for when the graceful-delete fails.
A connector developer could choose to swallow all errors encountered, or
fail-fast to indicate to the operator that there is an issue with the
graceful-delete flow.
If the alternative is crashing the herder, connector developers may choose
to hide serious errors, which is undesirable.

2. What happens when the destroy() call takes a long time to complete, or
is interrupted?

It appears that your implementation serially destroy()s each appropriate
connector, and may prevent the herder thread from making progress while the
operation is ongoing.
We have previously had to patch Connect to perform all connector and task
operations on a background thread, because some connector method
implementations can stall indefinitely.
Connect also has the notion of "cancelling" a connector/task if a graceful
shutdown timeout operation takes too long. Perhaps some of that design or
machinery may be useful to protect this method call as well.

More specific to the destroy() call itself, what happens when a connector
completes part of a destroy operation and then cannot complete the
remainder, either due to timing out or a worker crashing?
What is the contract with the connector developer about this method? Is the
destroy() only started exactly once during the lifetime of the connector,
or may it be retried?

3. What should be considered a reasonable custom implementation of the
destroy() call? What resources should it clean up by default?

I think we can broadly categorize the state a connector mutates among the
following
* Framework-managed state (e.g. source offsets, consumer offsets)
* Implementation detail state (e.g. debezium db history topic, audit
tables, temporary accounts)
* Third party system data (e.g. the actual data being written by a sink
connector)
* Third party system metadata (e.g. tables in a database, delivery
receipts, permissions)

I think it's apparent that the framework-managed state cannot/should not be
interacted with by the destroy() call. However, the framework could be
changed to clean up these resources at the same time that destroy() is
called. Is that out-of-scope of this proposal, and better handled by manual
intervention?
From the text of the KIP, I think it explicitly includes the Implementation
detail state, which should not be depended on externally and should be safe
to clean up during a destroy(). I think this is completely reasonable.
Are the third-party data and metadata out-of-scope for this proposal? Can
we officially recommend against it, 

Re: [DISCUSS] KIP-883: Add delete callback method to Connector API

2022-11-09 Thread Greg Harris
Hi Hector,

Thanks for the KIP!

This is certainly missing functionality from the native Connect framework,
and we should try to make it possible to inform connectors about this part
of their lifecycle.
However, as with most functionality that was left out of the initial
implementation of the framework, the details are more challenging to work
out.

1. What happens when the destroy call throws an error, how does the
framework respond?

This is unspecified in the KIP, and it appears that your proposed changes
could cause the herder to fail.
>From the perspective of operators & connector developers, what is a
reasonable expectation to have for failure of a destroy?
I could see operators wanting both a graceful-delete to make use of this
new feature, and a force-delete for when the graceful-delete fails.
A connector developer could choose to swallow all errors encountered, or
fail-fast to indicate to the operator that there is an issue with the
graceful-delete flow.
If the alternative is crashing the herder, connector developers may choose
to hide serious errors, which is undesirable.

2. What happens when the destroy() call takes a long time to complete, or
is interrupted?

It appears that your implementation serially destroy()s each appropriate
connector, and may prevent the herder thread from making progress while the
operation is ongoing.
We have previously had to patch Connect to perform all connector and task
operations on a background thread, because some connector method
implementations can stall indefinitely.
Connect also has the notion of "cancelling" a connector/task if a graceful
shutdown timeout operation takes too long. Perhaps some of that design or
machinery may be useful to protect this method call as well.

More specific to the destroy() call itself, what happens when a connector
completes part of a destroy operation and then cannot complete the
remainder, either due to timing out or a worker crashing?
What is the contract with the connector developer about this method? Is the
destroy() only started exactly once during the lifetime of the connector,
or may it be retried?

3. What should be considered a reasonable custom implementation of the
destroy() call? What resources should it clean up by default?

I think we can broadly categorize the state a connector mutates among the
following
* Framework-managed state (e.g. source offsets, consumer offsets)
* Implementation detail state (e.g. debezium db history topic, audit
tables, temporary accounts)
* Third party system data (e.g. the actual data being written by a sink
connector)
* Third party system metadata (e.g. tables in a database, delivery
receipts, permissions)

I think it's apparent that the framework-managed state cannot/should not be
interacted with by the destroy() call. However, the framework could be
changed to clean up these resources at the same time that destroy() is
called. Is that out-of-scope of this proposal, and better handled by manual
intervention?
>From the text of the KIP, I think it explicitly includes the Implementation
detail state, which should not be depended on externally and should be safe
to clean up during a destroy(). I think this is completely reasonable.
Are the third-party data and metadata out-of-scope for this proposal? Can
we officially recommend against it, or should we accommodate users and
connector developers that wish to clean up data/metadata during destroy()?

4. How should connector implementations of destroy handle backwards
compatibility?

In terms of backward-compatibility for the framework vs connector versions,
I think the default-noop method is very reasonable.
However, what happens when someone upgrades from a version of a connector
without a destroy() implementation to one with an implementation, and
maintain backwards compatibility?
To replicate the same behavior, the connector might include something like
an `enable.cleanup` config which allows users to opt-in to the new
behavior. This could mean the proliferation of many different
configurations to handle this behavior.
Maybe we can provide some recommendations to developers, or some mechanism
to standardize this opt-in behavior.

I'm interested to hear if you have any experience with the above, if you've
experimented with this feature in your fork.

Thanks,
Greg


On Thu, Nov 3, 2022 at 11:55 AM Hector Geraldino (BLOOMBERG/ 919 3RD A) <
hgerald...@bloomberg.net> wrote:

> Hi everyone,
>
> I've submitted KIP-883, which introduces a callback to the public
> Connector API called when deleting a connector:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-883%3A+Add+delete+callback+method+to+Connector+API
>
> It adds a new `deleted()` method (open to better naming suggestions) to
> the org.apache.kafka.connect.connector.Connector abstract class, which will
> be invoked by connect Workers when a connector is being deleted.
>
> Feedback and comments are welcome.
>
> Thank you!
> Hector
>
>


[DISCUSS] KIP-883: Add delete callback method to Connector API

2022-11-03 Thread Hector Geraldino (BLOOMBERG/ 919 3RD A)
Hi everyone,

I've submitted KIP-883, which introduces a callback to the public Connector API 
called when deleting a connector:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-883%3A+Add+delete+callback+method+to+Connector+API

It adds a new `deleted()` method (open to better naming suggestions) to the 
org.apache.kafka.connect.connector.Connector abstract class, which will be 
invoked by connect Workers when a connector is being deleted. 

Feedback and comments are welcome.

Thank you!
Hector