Yash Mayya created KAFKA-15113:
----------------------------------

             Summary: Gracefully handle cases where a sink connector's admin 
and consumer client config overrides target different Kafka clusters
                 Key: KAFKA-15113
                 URL: https://issues.apache.org/jira/browse/KAFKA-15113
             Project: Kafka
          Issue Type: Task
          Components: KafkaConnect
            Reporter: Yash Mayya


Background reading -
 * 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy]
 
 * 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
 

 

>From [https://github.com/apache/kafka/pull/13434#discussion_r1144415671] -
{quote}Currently, admin clients are only instantiated for sink connectors to 
create the DLQ topic if required. So it seems like it could be technically 
possible for a sink connector's consumer client overrides to target a different 
Kafka cluster from its producer and admin client overrides. Such a setup won't 
work with this implementation of the get offsets API as it is using an admin 
client to get a sink connector's consumer group offsets. However, I'm not sure 
we want to use a consumer client to retrieve the offsets either as we shouldn't 
be disrupting the existing sink tasks' consumer group just to fetch offsets. 
Leveraging a sink task's consumer also isn't an option because fetching offsets 
for a stopped sink connector (where all the tasks will be stopped) should be 
allowed. I'm wondering if we should document that a connector's various client 
config override policies shouldn't target different Kafka clusters (side note - 
looks like we don't [currently 
document|https://kafka.apache.org/documentation/#connect] client config 
overrides for Connect beyond just the worker property 
{{{}connector.client.config.override.policy{}}}).
{quote}
 
{quote}I don't think we need to worry too much about this. I cannot imagine a 
sane use case that involves overriding a connector's Kafka clients with 
different Kafka clusters (not just bootstrap servers, but actually different 
clusters) for producer/consumer/admin. I'd be fine with adding a note to our 
docs that that kind of setup isn't supported but I really, really hope that 
it's not necessary and nobody's trying to do that in the first place. I also 
suspect that there are other places where this might cause issues, like with 
exactly-once source support or automatic topic creation for source connectors.

That said, there is a different case we may want to consider: someone may have 
configured consumer overrides for a sink connector, but not admin overrides. 
This may happen if they don't use a DLQ topic. I don't know if we absolutely 
need to handle this now and we may consider filing a follow-up ticket to look 
into this, but one quick-and-dirty thought I've had is to configure the admin 
client used here with a combination of the configurations for the connector's 
admin client and its consumer, giving precedent to the latter.
{quote}
 

Also from [https://github.com/apache/kafka/pull/13818#discussion_r1224138055] -
{quote}We will have undesirable behavior if the connector is targeting a Kafka 
cluster different from the Connect cluster's backing Kafka cluster and the user 
has configured the consumer overrides appropriately for their connector, but 
not the admin overrides (something we also discussed previously 
[here|https://github.com/apache/kafka/pull/13434#discussion_r1144415671]).

In the above case, if a user attempts to reset their sink connector's offsets 
via the {{DELETE /connectors/\{connector}/offsets}} endpoint, the following 
will occur:
 # We list the consumer group offsets via {{Admin::listConsumerGroupOffsets}} 
which returns an empty partition offsets map for the sink connector's consumer 
group ID (it exists on a different Kafka cluster to the one that the admin 
client is connecting to).
 # We call {{SinkConnector::alterOffsets}} with an empty offsets map which 
could cause the sink connector to propagate the offsets reset related changes 
to the sink system.
 # We attempt to delete the consumer group via {{Admin::deleteConsumerGroups}} 
which returns {{GroupIdNotFoundException}} which we essentially swallow in 
order to keep offsets reset operations idempotent and return a success message 
to the user (even though the real consumer group for the sink connector on the 
other Kafka cluster hasn't been deleted).

This will occur if the connector's admin overrides are missing OR the admin 
overrides are deliberately configured to target a Kafka cluster different from 
the consumer overrides (although like you pointed out in the other linked 
thread, this doesn't seem like a valid use case that we'd even want to support).

I guess we'd want to pursue the approach you suggested where we'd configure the 
admin client with a combination of the connector's admin overrides and consumer 
overrides?

Another option could potentially be to somehow verify that the 
{{admin.override.bootstrap.servers}} in the connector's config / 
{{admin.bootstrap.servers}} in the worker config / {{bootstrap.servers}} in the 
worker config (in order of preference) correspond to the same Kafka cluster as 
{{consumer.override.bootstrap.servers}} in the connector's config / 
{{consumer.bootstrap.servers}} in the worker config / {{bootstrap.servers}} in 
the worker config (in order of preference) and fail the request if we are able 
to reliably determine that they aren't pointing to the same Kafka clusters? I'm 
not sure that this is a feasible approach however.

Yet another option could be to remove the idempotency guarantee from the 
{{DELETE /connectors/\{connector}/offsets}} endpoint and if we encounter a 
{{GroupIdNotFoundException}} from {{{}Admin::deleteConsumerGroups{}}}, return 
an error message to the user indicating that either the offsets have already 
been reset previously or else they might need to check their connector's admin 
overrides (this does seem fairly ugly though).

Edit: A more elegant way might be to switch the offset reset mechanism from 
deleting the consumer group to deleting the offsets for all topic partitions 
via {{Admin::deleteConsumerGroupOffsets}} (similar to what we do for the 
{{PATCH /connectors/\{connector}/offsets}} endpoint when the offset for a 
partition is specified as {{{}null{}}}). This way we could explicitly check for 
existence of the sink connector's consumer group prior to listing its offsets 
and fail requests if the consumer group doesn't exist (the minor down-side is 
that this will require an additional admin client request).
{quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to