Yash Mayya created KAFKA-15113: ---------------------------------- Summary: Gracefully handle cases where a sink connector's admin and consumer client config overrides target different Kafka clusters Key: KAFKA-15113 URL: https://issues.apache.org/jira/browse/KAFKA-15113 Project: Kafka Issue Type: Task Components: KafkaConnect Reporter: Yash Mayya
Background reading - * [https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy] * [https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect] >From [https://github.com/apache/kafka/pull/13434#discussion_r1144415671] - {quote}Currently, admin clients are only instantiated for sink connectors to create the DLQ topic if required. So it seems like it could be technically possible for a sink connector's consumer client overrides to target a different Kafka cluster from its producer and admin client overrides. Such a setup won't work with this implementation of the get offsets API as it is using an admin client to get a sink connector's consumer group offsets. However, I'm not sure we want to use a consumer client to retrieve the offsets either as we shouldn't be disrupting the existing sink tasks' consumer group just to fetch offsets. Leveraging a sink task's consumer also isn't an option because fetching offsets for a stopped sink connector (where all the tasks will be stopped) should be allowed. I'm wondering if we should document that a connector's various client config override policies shouldn't target different Kafka clusters (side note - looks like we don't [currently document|https://kafka.apache.org/documentation/#connect] client config overrides for Connect beyond just the worker property {{{}connector.client.config.override.policy{}}}). {quote} {quote}I don't think we need to worry too much about this. I cannot imagine a sane use case that involves overriding a connector's Kafka clients with different Kafka clusters (not just bootstrap servers, but actually different clusters) for producer/consumer/admin. I'd be fine with adding a note to our docs that that kind of setup isn't supported but I really, really hope that it's not necessary and nobody's trying to do that in the first place. I also suspect that there are other places where this might cause issues, like with exactly-once source support or automatic topic creation for source connectors. That said, there is a different case we may want to consider: someone may have configured consumer overrides for a sink connector, but not admin overrides. This may happen if they don't use a DLQ topic. I don't know if we absolutely need to handle this now and we may consider filing a follow-up ticket to look into this, but one quick-and-dirty thought I've had is to configure the admin client used here with a combination of the configurations for the connector's admin client and its consumer, giving precedent to the latter. {quote} Also from [https://github.com/apache/kafka/pull/13818#discussion_r1224138055] - {quote}We will have undesirable behavior if the connector is targeting a Kafka cluster different from the Connect cluster's backing Kafka cluster and the user has configured the consumer overrides appropriately for their connector, but not the admin overrides (something we also discussed previously [here|https://github.com/apache/kafka/pull/13434#discussion_r1144415671]). In the above case, if a user attempts to reset their sink connector's offsets via the {{DELETE /connectors/\{connector}/offsets}} endpoint, the following will occur: # We list the consumer group offsets via {{Admin::listConsumerGroupOffsets}} which returns an empty partition offsets map for the sink connector's consumer group ID (it exists on a different Kafka cluster to the one that the admin client is connecting to). # We call {{SinkConnector::alterOffsets}} with an empty offsets map which could cause the sink connector to propagate the offsets reset related changes to the sink system. # We attempt to delete the consumer group via {{Admin::deleteConsumerGroups}} which returns {{GroupIdNotFoundException}} which we essentially swallow in order to keep offsets reset operations idempotent and return a success message to the user (even though the real consumer group for the sink connector on the other Kafka cluster hasn't been deleted). This will occur if the connector's admin overrides are missing OR the admin overrides are deliberately configured to target a Kafka cluster different from the consumer overrides (although like you pointed out in the other linked thread, this doesn't seem like a valid use case that we'd even want to support). I guess we'd want to pursue the approach you suggested where we'd configure the admin client with a combination of the connector's admin overrides and consumer overrides? Another option could potentially be to somehow verify that the {{admin.override.bootstrap.servers}} in the connector's config / {{admin.bootstrap.servers}} in the worker config / {{bootstrap.servers}} in the worker config (in order of preference) correspond to the same Kafka cluster as {{consumer.override.bootstrap.servers}} in the connector's config / {{consumer.bootstrap.servers}} in the worker config / {{bootstrap.servers}} in the worker config (in order of preference) and fail the request if we are able to reliably determine that they aren't pointing to the same Kafka clusters? I'm not sure that this is a feasible approach however. Yet another option could be to remove the idempotency guarantee from the {{DELETE /connectors/\{connector}/offsets}} endpoint and if we encounter a {{GroupIdNotFoundException}} from {{{}Admin::deleteConsumerGroups{}}}, return an error message to the user indicating that either the offsets have already been reset previously or else they might need to check their connector's admin overrides (this does seem fairly ugly though). Edit: A more elegant way might be to switch the offset reset mechanism from deleting the consumer group to deleting the offsets for all topic partitions via {{Admin::deleteConsumerGroupOffsets}} (similar to what we do for the {{PATCH /connectors/\{connector}/offsets}} endpoint when the offset for a partition is specified as {{{}null{}}}). This way we could explicitly check for existence of the sink connector's consumer group prior to listing its offsets and fail requests if the consumer group doesn't exist (the minor down-side is that this will require an additional admin client request). {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)