[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic
yashmayya commented on code in PR #12984: URL: https://github.com/apache/kafka/pull/12984#discussion_r1093620557 ## clients/src/main/java/org/apache/kafka/common/utils/Timer.java: ## @@ -161,7 +161,7 @@ public long remainingMs() { /** * Get the current time in milliseconds. This will return the same cached value until the timer * has been updated using one of the {@link #update()} methods or {@link #sleep(long)} is used. - * + * Review Comment: Makes sense, but wouldn't it also be slightly strange to create a dedicated PR for such a trivial change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic
yashmayya commented on code in PR #12984: URL: https://github.com/apache/kafka/pull/12984#discussion_r1093606834 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ## @@ -711,9 +768,35 @@ KafkaBasedLog setupAndCreateKafkaBasedLog(String topic, final Wo return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminSupplier); } -private void sendPrivileged(String key, byte[] value) { +/** + * Send a single record to the config topic synchronously. Note that {@link #claimWritePrivileges()} must be + * successfully invoked before calling this method if this store is configured to use a fencable writer. + * @param key the record key + * @param value the record value + * @param timer Timer bounding how long this method can block. The timer is updated before the method returns. + */ +private void sendPrivileged(String key, byte[] value, Timer timer) throws ExecutionException, InterruptedException, TimeoutException { +sendPrivileged(Collections.singletonList(new ProducerKeyValue(key, value)), timer); +} + +/** + * Send one or more records to the config topic synchronously. Note that {@link #claimWritePrivileges()} must be + * successfully invoked before calling this method if this store is configured to use a fencable writer. + * @param keyValues the list of producer record key/value pairs + * @param timer Timer bounding how long this method can block. The timer is updated before the method returns. + */ +private void sendPrivileged(List keyValues, Timer timer) throws ExecutionException, InterruptedException, TimeoutException { if (!usesFencableWriter) { -configLog.send(key, value); +List> producerFutures = new ArrayList<>(); +keyValues.forEach( +keyValue -> producerFutures.add(configLog.send(keyValue.key, keyValue.value)) +); + +for (Future future : producerFutures) { Review Comment: Done, but just curious about when the producer send might block? The doc says that it should just essentially add the record to the record buffer and return? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic
yashmayya commented on code in PR #12984: URL: https://github.com/apache/kafka/pull/12984#discussion_r1093090686 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ## @@ -711,9 +752,32 @@ KafkaBasedLog setupAndCreateKafkaBasedLog(String topic, final Wo return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminSupplier); } -private void sendPrivileged(String key, byte[] value) { +/** + * Send a single record to the config topic synchronously. Note that {@link #claimWritePrivileges()} must be + * successfully invoked before calling this method if this store is configured to use a fencable writer. + * @param key the record key + * @param value the record value + */ +private void sendPrivileged(String key, byte[] value) throws ExecutionException, InterruptedException, TimeoutException { +sendPrivileged(Collections.singletonList(new ProducerKeyValue(key, value))); +} + +/** + * Send one or more records to the config topic synchronously. Note that {@link #claimWritePrivileges()} must be + * successfully invoked before calling this method if this store is configured to use a fencable writer. + * @param keyValues the list of producer record key/value pairs + */ +private void sendPrivileged(List keyValues) throws ExecutionException, InterruptedException, TimeoutException { if (!usesFencableWriter) { -configLog.send(key, value); +List> producerFutures = new ArrayList<>(); +keyValues.forEach( +keyValue -> producerFutures.add(configLog.send(keyValue.key, keyValue.value)) +); + +for (Future future : producerFutures) { +future.get(READ_WRITE_TIMEOUT_MS, TimeUnit.MILLISECONDS); Review Comment: > Considering this is all taking place on the herder's tick thread, we should probably care about the difference. Makes sense. > We might be able to use the [Timer class](https://github.com/apache/kafka/blob/eb7f490159c924ca0f21394d58366c257998f52e/clients/src/main/java/org/apache/kafka/common/utils/Timer.java) to simplify some of this logic. Thanks, that's a great suggestion and it looks like a perfect fit for the use case here. One thing I'd like to call out is that the existing `putTaskConfigs` implementation had multiple (3) reads to the end of the log with each having a 30 second timeout (thus potentially blocking for up to 90 seconds in total). With the latest pushed changes in this PR, the overall timeout across all reads and writes done by `putTaskConfigs` will now be 30 seconds. While I do believe that this is the right thing to do, I just wanted to call it out explicitly to make sure that we're on the same page! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic
yashmayya commented on code in PR #12984: URL: https://github.com/apache/kafka/pull/12984#discussion_r1084960389 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ## @@ -712,8 +733,16 @@ KafkaBasedLog setupAndCreateKafkaBasedLog(String topic, final Wo } private void sendPrivileged(String key, byte[] value) { +sendPrivileged(key, value, null); +} + +private void sendPrivileged(String key, byte[] value, Callback callback) { if (!usesFencableWriter) { -configLog.send(key, value); Review Comment: Yeah, that's right, the API response will only include the generic top level message. > I think it'd be nice to include more detail on the cause of the failure I strongly agree, and this was discussed in some more detail on the [other thread](https://github.com/apache/kafka/pull/12984#discussion_r1064077119). > We wouldn't be making it more vague. The message would state that the write to the config topic failed which is the cause for failure. Since the exception mapper used by Connect's REST server only writes the [top level exception's message](https://github.com/apache/kafka/blob/d798ec779c25dba31fa5ee9384d159ed54c6e07b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java#L72) to the response (i.e. nested exceptions aren't surfaced via the REST API response), I think it makes sense to keep the top level exception's message generic and allow users to debug further via the worker logs (where the entire exception chain's stack trace will be visible). ... The reasoning here is that since a Connect user may not even know that Connect uses a producer under the hood to write certain requests to the config topic for asynchronous processing, it would make more sense to have an informative Connect specific exception message rather than directly throwing the producer exception which may or may not contain enough details to be relevant to a Connect user. > Another option for the above issue could be changing the exception mapper to concatenate all the exception messages from the exception chain. > Yet another option for this could be to simply append a "Check the worker logs for more details on the error" to the top level exception's message in the REST API response (the worker logs will have the entire exception chain). Thoughts? What do you think about modifying the exception mapper to be more informative (either in this PR or a separate one)? ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ## @@ -711,9 +742,9 @@ KafkaBasedLog setupAndCreateKafkaBasedLog(String topic, final Wo return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminSupplier); } -private void sendPrivileged(String key, byte[] value) { +private void sendPrivileged(String key, byte[] value) throws ExecutionException, InterruptedException { if (!usesFencableWriter) { -configLog.send(key, value); +configLog.send(key, value).get(); Review Comment: Thanks Chris, both great points. The `get` without timeout here was definitely a miss on my part. I've addressed both of your raised concerns in the latest patch (including batching multiple sends in a single transaction for the EOS enabled case). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic
yashmayya commented on code in PR #12984: URL: https://github.com/apache/kafka/pull/12984#discussion_r1081145849 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ## @@ -712,8 +733,16 @@ KafkaBasedLog setupAndCreateKafkaBasedLog(String topic, final Wo } private void sendPrivileged(String key, byte[] value) { +sendPrivileged(key, value, null); +} + +private void sendPrivileged(String key, byte[] value, Callback callback) { if (!usesFencableWriter) { -configLog.send(key, value); Review Comment: I've gone ahead and made the changes to convert the `KafkaConfigBackingStore` APIs to be synchronous even when EOS is disabled (thus making the behavior more in-line with the EOS case). This simplifies things significantly while also providing consistency w.r.t error handling across all the `KafkaConfigBackingStore` APIs without making invasive changes. ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ## @@ -723,7 +752,11 @@ private void sendPrivileged(String key, byte[] value) { try { fencableProducer.beginTransaction(); -fencableProducer.send(new ProducerRecord<>(topic, key, value)); +fencableProducer.send(new ProducerRecord<>(topic, key, value), (metadata, exception) -> { Review Comment: I've removed the usage of the producer callback here since we're moving to synchronous usage of producer send in the non-EOS case as well anyway (aside from the earlier point that it doesn't really make sense to handle errors via both a callback as well as `commitTransaction`). The behavior of surfacing exceptions synchronously is similar in both cases now; one through calling `get()` on the returned future from `Producer::send` and the other through `Producer::commitTransaction`. > Another option for the above issue could be changing the exception mapper to concatenate all the exception messages from the exception chain. Yet another option for this could be to simply append a "Check the worker logs for more details on the error" to the top level exception's message in the REST API response (the worker logs will have the entire exception chain). Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic
yashmayya commented on code in PR #12984: URL: https://github.com/apache/kafka/pull/12984#discussion_r1064077119 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ## @@ -723,7 +752,11 @@ private void sendPrivileged(String key, byte[] value) { try { fencableProducer.beginTransaction(); -fencableProducer.send(new ProducerRecord<>(topic, key, value)); +fencableProducer.send(new ProducerRecord<>(topic, key, value), (metadata, exception) -> { Review Comment: I'm not sure I follow what benefit we'd be getting here by handling both the producer callback error as well as the one thrown by `commitTransaction`? The control flow would be more straightforward by removing the producer callback and just relying on `commitTransaction` to throw exceptions, if any. The producer's Javadoc itself also suggests that callbacks need not be defined when using the transactional producer since `commitTransaction` will throw the error from the last failed send in a transaction. > making them more vague to compensate We wouldn't be making it more vague. The message would state that the write to the config topic failed which is the cause for failure. Since the exception mapper used by Connect's REST server only writes the [top level exception's message](https://github.com/apache/kafka/blob/d798ec779c25dba31fa5ee9384d159ed54c6e07b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java#L72) to the response (i.e. nested exceptions aren't surfaced via the REST API response), I think it makes sense to keep the top level exception's message generic and allow users to debug further via the worker logs (where the entire exception chain's stack trace will be visible). Note that I'm suggesting a similar change for the non-EOS enabled case as well - i.e. don't use the producer error directly [here](https://github.com/apache/kafka/pull/12984/files#diff-c346b7b90fe30ac08b4211375fe36208139a90e40838dd3a7996021a8c4c5b13R1064), instead wrapping it in a `ConnectExc eption` which says that the write to the config topic failed. The reasoning here is that since a Connect user may not even know that Connect uses a producer under the hood to write certain requests to the config topic for asynchronous processing, it would make more sense to have an informative Connect specific exception message rather than directly throwing the producer exception which may or may not contain enough details to be relevant to a Connect user. > If we're hiding the result from the REST calls, are we not also hiding the error from the herder tick thread? Hm no, the hiding issue was only for non-EOS enabled workers. Like I've pointed out above, for workers that have EOS enabled, the REST API does return a `500` response. Edit: Another option for the above issue could be changing the exception mapper to concatenate all the exception messages from the exception chain. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic
yashmayya commented on code in PR #12984: URL: https://github.com/apache/kafka/pull/12984#discussion_r1064077119 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ## @@ -723,7 +752,11 @@ private void sendPrivileged(String key, byte[] value) { try { fencableProducer.beginTransaction(); -fencableProducer.send(new ProducerRecord<>(topic, key, value)); +fencableProducer.send(new ProducerRecord<>(topic, key, value), (metadata, exception) -> { Review Comment: I'm not sure I follow what benefit we'd be getting here by handling both the producer callback error as well as the one thrown by `commitTransaction`? The control flow would be more straightforward by removing the producer callback and just relying on `commitTransaction` to throw exceptions, if any. The producer's Javadoc itself also suggests that callbacks need not be defined when using the transactional producer since `commitTransaction` will throw the error from the last failed send in a transaction. > making them more vague to compensate We wouldn't be making it more vague. The message would state that the write to the config topic failed which is the cause for failure. Since the exception mapper used by Connect's REST server only writes the [top level exception's message](https://github.com/apache/kafka/blob/d798ec779c25dba31fa5ee9384d159ed54c6e07b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java#L72) to the response (i.e. nested exceptions aren't surfaced via the REST API response), I think it makes sense to keep the top level exception's message generic and allow users to debug further via the worker logs (where the entire exception chain's stack trace will be visible). Note that I'm suggesting a similar change for the non-EOS enabled case as well - i.e. don't use the producer error directly [here](https://github.com/apache/kafka/pull/12984/files#diff-c346b7b90fe30ac08b4211375fe36208139a90e40838dd3a7996021a8c4c5b13R1064), instead wrapping it in a `ConnectExc eption` which says that the write to the config topic failed. The reasoning here is that since a Connect user may not even know that Connect uses a producer under the hood to write certain requests to the config topic for asynchronous processing, it would make more sense to have an informative Connect specific exception message rather than directly throwing the producer exception which may or may not contain enough details to be relevant to a Connect user. > If we're hiding the result from the REST calls, are we not also hiding the error from the herder tick thread? Hm no, the hiding issue was only for non-EOS enabled workers. Like I've pointed out above, for workers that have EOS enabled, the REST API does return a `500` response. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic
yashmayya commented on code in PR #12984: URL: https://github.com/apache/kafka/pull/12984#discussion_r1064077085 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ## @@ -712,8 +733,16 @@ KafkaBasedLog setupAndCreateKafkaBasedLog(String topic, final Wo } private void sendPrivileged(String key, byte[] value) { +sendPrivileged(key, value, null); +} + +private void sendPrivileged(String key, byte[] value, Callback callback) { if (!usesFencableWriter) { -configLog.send(key, value); Review Comment: Oh okay, I see what you're saying now. Sorry, I think I misunderstood you earlier. I agree with you, none of the `KafkaConfigBackingStore` use cases seem to necessarily require the performance benefit of using async send with callbacks. Although to make it synchronous, I think it might be better to avoid using the producer callback altogether and instead call `get()` on the returned future (which throws any exceptions that were encountered while sending the record), WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic
yashmayya commented on code in PR #12984: URL: https://github.com/apache/kafka/pull/12984#discussion_r1060450083 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ## @@ -723,7 +752,11 @@ private void sendPrivileged(String key, byte[] value) { try { fencableProducer.beginTransaction(); -fencableProducer.send(new ProducerRecord<>(topic, key, value)); +fencableProducer.send(new ProducerRecord<>(topic, key, value), (metadata, exception) -> { Review Comment: I believe that `commitTransaction` ensures that all producer records are flushed and all pending callbacks invoked before the transaction is committed, so this - `where the callback error is propagated before calling the commitTransaction and getting the more generic error message.` should already be the case with the current changes? However, it doesn't look like using callbacks with the transactional producer offers much benefit - we could simply reword the existing exception message (`this may be due to a transient error and the request can be safely retried`) to indicate that it could potentially denote a non-transient error as well. Furthermore, the specific case that this PR attempted to fix (missing WRITE ACL on the config topic not being surfaced to users properly) is anyway highly unlikely to go unnoticed in an EOS enabled Connect cluster since the herder thread itself will repeatedly hit this condition [here](https://github.com/apache/kafka/blob/9d1f9f77642d7e95dec37647657478ed187bdeb7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L419) in the tick loop (the worker logs will reveal the underlying `TopicAuthorizationException`) and request processing won't happen at all (all external requests that are run on the herder's thread will timeout). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic
yashmayya commented on code in PR #12984: URL: https://github.com/apache/kafka/pull/12984#discussion_r1060149266 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ## @@ -712,8 +733,16 @@ KafkaBasedLog setupAndCreateKafkaBasedLog(String topic, final Wo } private void sendPrivileged(String key, byte[] value) { +sendPrivileged(key, value, null); +} + +private void sendPrivileged(String key, byte[] value, Callback callback) { if (!usesFencableWriter) { -configLog.send(key, value); Review Comment: I agree that terminating the producer callbacks in the `KafkaConfigBackingStore` itself would look cleaner. However, if we want to surface producer errors to users via the REST API responses (which is the goal for this PR), we would need to complete the request callbacks either directly or via an intermediary callback (also see [this comment](https://github.com/apache/kafka/pull/12984#discussion_r1046720576)) in the `KafkaConfigBackingStore` (both of which would involve changing the method signatures to accept callbacks) due to the way that the REST API responses are generated using `FutureCallback` in `ConnectorsResource`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic
yashmayya commented on code in PR #12984: URL: https://github.com/apache/kafka/pull/12984#discussion_r1050765139 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ## @@ -712,8 +733,16 @@ KafkaBasedLog setupAndCreateKafkaBasedLog(String topic, final Wo } private void sendPrivileged(String key, byte[] value) { +sendPrivileged(key, value, null); +} + +private void sendPrivileged(String key, byte[] value, Callback callback) { if (!usesFencableWriter) { -configLog.send(key, value); Review Comment: Thanks, that's a good observation. Currently, the pause / resume APIs (which call `putTargetState`) don't use callback mechanisms whatsoever and they also don't have any documented response bodies (they send back a `202` response with an empty body right now) and will hence need a bit more refactoring. I feel like that should be done in a follow up / separate PR, WDYT? Edit: Actually, on second thought it's a closely related issue and could be taken care of in this PR itself. I'll work on making the requisite changes. Edit 2: Although now with these changes, there's an inconsistency between different APIs since `deleteConnectorConfig`, `putTaskConfigs` and a couple of other methods are still using `null` callbacks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic
yashmayya commented on code in PR #12984: URL: https://github.com/apache/kafka/pull/12984#discussion_r1050765139 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ## @@ -712,8 +733,16 @@ KafkaBasedLog setupAndCreateKafkaBasedLog(String topic, final Wo } private void sendPrivileged(String key, byte[] value) { +sendPrivileged(key, value, null); +} + +private void sendPrivileged(String key, byte[] value, Callback callback) { if (!usesFencableWriter) { -configLog.send(key, value); Review Comment: Thanks, that's a good observation. Currently, the pause / resume APIs (which call `putTargetState`) don't use callback mechanisms whatsoever and they also don't have any documented response bodies (they send back a `202` response with an empty body right now) and will hence need a bit more refactoring. I feel like that should be done in a follow up / separate PR, WDYT? Edit: Actually, on second thought it's a closely related issue and could be taken care of in this PR itself. I'll work on making the requisite changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic
yashmayya commented on code in PR #12984: URL: https://github.com/apache/kafka/pull/12984#discussion_r1050765139 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ## @@ -712,8 +733,16 @@ KafkaBasedLog setupAndCreateKafkaBasedLog(String topic, final Wo } private void sendPrivileged(String key, byte[] value) { +sendPrivileged(key, value, null); +} + +private void sendPrivileged(String key, byte[] value, Callback callback) { if (!usesFencableWriter) { -configLog.send(key, value); Review Comment: Thanks, that's a good observation. Currently, the pause / resume APIs (which call `putTargetState`) don't use callback mechanisms whatsoever and they also don't have any documented response bodies (they send back a `202` response with an empty body right now) and will hence need a bit more refactoring. I feel like that should be done in a follow up / separate PR, WDYT? ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ## @@ -723,7 +752,11 @@ private void sendPrivileged(String key, byte[] value) { try { fencableProducer.beginTransaction(); -fencableProducer.send(new ProducerRecord<>(topic, key, value)); +fencableProducer.send(new ProducerRecord<>(topic, key, value), (metadata, exception) -> { Review Comment: > I believe here we're intentionally relying on this behavior of commitTransaction to propagate errors: Thanks for pointing this out! As per https://kafka.apache.org/33/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send- ``` When used as part of a transaction, it is not necessary to define a callback or check the result of the future in order to detect errors from send. If any of the send calls failed with an irrecoverable error, the final [commitTransaction()](https://kafka.apache.org/33/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#commitTransaction--) call will fail and throw the exception from the last failed send. When this happens, your application should call [abortTransaction()](https://kafka.apache.org/33/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#abortTransaction--) to reset the state and continue to send data. ``` So what you're saying definitely does make sense, and I tried this scenario out manually - for an EOS enabled worker, the response from the `POST /connectors` API when the worker's principal doesn't have a `WRITE` ACL on the config topic is (without the changes from this PR): ``` { "error_code": 500, "message": "Failed to write to config topic; this may be due to a transient error and the request can be safely retried" } ``` While the worker logs do have the exact root cause as well (the `TopicAuthorizationException`), I believe the REST API response in this case isn't all that helpful to the user. With the changes from this PR, the response in the same scenario looks like: ``` { "error_code": 500, "message": "Not authorized to access topics: [connect-configs]" } ``` > I'm not sure if we're in danger of double-completing the callback I don't believe we are in danger of double completing the callback, although you're right in that we are unnecessarily handling the error in two places in a way. We could avoid the use of callbacks in the producer send (although I couldn't find anything that explicitly warns against doing so) when using the transactional producer and instead refactor [this](https://github.com/apache/kafka/blob/e3585a4cd5ddb5b8475a49c38143d18e7c640bfe/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1494) / [this](https://github.com/apache/kafka/blob/e3585a4cd5ddb5b8475a49c38143d18e7c640bfe/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L731) so that the final exception that is thrown is the producer exception itself rather than the (doubly) wrapped one. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic
yashmayya commented on code in PR #12984: URL: https://github.com/apache/kafka/pull/12984#discussion_r1046722667 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java: ## @@ -51,8 +52,9 @@ public interface ConfigBackingStore { * Update the configuration for a connector. * @param connector name of the connector * @param properties the connector configuration + * @param callback the callback to be invoked after the put is complete; can be {@code null} if no callback is desired */ -void putConnectorConfig(String connector, Map properties); +void putConnectorConfig(String connector, Map properties, Callback callback); Review Comment: Could potentially add a new overloaded method in `KafkaConfigBackingStore` and avoid touching this interface method although that would require refactoring `AbstractHerder` to make it generic in order to allow `DistributedHerder` to access an instance of `KafkaConfigBackingStore` rather than `ConfigBackingStore`. However, that would be a much more noisy change IMO. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -1058,14 +1058,20 @@ public void putConnectorConfig(final String connName, final Map } log.trace("Submitting connector config {} {} {}", connName, allowReplace, configState.connectors()); -writeToConfigTopicAsLeader(() -> configBackingStore.putConnectorConfig(connName, config)); - -// Note that we use the updated connector config despite the fact that we don't have an updated -// snapshot yet. The existing task info should still be accurate. -ConnectorInfo info = new ConnectorInfo(connName, config, configState.tasks(connName), -// validateConnectorConfig have checked the existence of CONNECTOR_CLASS_CONFIG -connectorType(config)); -callback.onCompletion(null, new Created<>(!exists, info)); +Callback cb = (err, result) -> { Review Comment: Another option could be to directly call `callback.onCompletion` in `KafkaConfigBackingStore::putConnectorConfig` itself - i.e. construct the `Created` object there using its in-memory maps directly rather than the snapshot; this would avoid this double callback kinda mechanism but would need some other refactoring. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org