[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic

2023-02-01 Thread via GitHub


yashmayya commented on code in PR #12984:
URL: https://github.com/apache/kafka/pull/12984#discussion_r1093620557


##
clients/src/main/java/org/apache/kafka/common/utils/Timer.java:
##
@@ -161,7 +161,7 @@ public long remainingMs() {
 /**
  * Get the current time in milliseconds. This will return the same cached 
value until the timer
  * has been updated using one of the {@link #update()} methods or {@link 
#sleep(long)} is used.
- *
+ * 

Review Comment:
   Makes sense, but wouldn't it also be slightly strange to create a dedicated 
PR for such a trivial change?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic

2023-02-01 Thread via GitHub


yashmayya commented on code in PR #12984:
URL: https://github.com/apache/kafka/pull/12984#discussion_r1093606834


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##
@@ -711,9 +768,35 @@ KafkaBasedLog 
setupAndCreateKafkaBasedLog(String topic, final Wo
 return createKafkaBasedLog(topic, producerProps, consumerProps, new 
ConsumeCallback(), topicDescription, adminSupplier);
 }
 
-private void sendPrivileged(String key, byte[] value) {
+/**
+ * Send a single record to the config topic synchronously. Note that 
{@link #claimWritePrivileges()} must be
+ * successfully invoked before calling this method if this store is 
configured to use a fencable writer.
+ * @param key the record key
+ * @param value the record value
+ * @param timer Timer bounding how long this method can block. The timer 
is updated before the method returns.
+ */
+private void sendPrivileged(String key, byte[] value, Timer timer) throws 
ExecutionException, InterruptedException, TimeoutException {
+sendPrivileged(Collections.singletonList(new ProducerKeyValue(key, 
value)), timer);
+}
+
+/**
+ * Send one or more records to the config topic synchronously. Note that 
{@link #claimWritePrivileges()} must be
+ * successfully invoked before calling this method if this store is 
configured to use a fencable writer.
+ * @param keyValues the list of producer record key/value pairs
+ * @param timer Timer bounding how long this method can block. The timer 
is updated before the method returns.
+ */
+private void sendPrivileged(List keyValues, Timer timer) 
throws ExecutionException, InterruptedException, TimeoutException {
 if (!usesFencableWriter) {
-configLog.send(key, value);
+List> producerFutures = new ArrayList<>();
+keyValues.forEach(
+keyValue -> 
producerFutures.add(configLog.send(keyValue.key, keyValue.value))
+);
+
+for (Future future : producerFutures) {

Review Comment:
   Done, but just curious about when the producer send might block? The doc 
says that it should just essentially add the record to the record buffer and 
return?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic

2023-02-01 Thread via GitHub


yashmayya commented on code in PR #12984:
URL: https://github.com/apache/kafka/pull/12984#discussion_r1093090686


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##
@@ -711,9 +752,32 @@ KafkaBasedLog 
setupAndCreateKafkaBasedLog(String topic, final Wo
 return createKafkaBasedLog(topic, producerProps, consumerProps, new 
ConsumeCallback(), topicDescription, adminSupplier);
 }
 
-private void sendPrivileged(String key, byte[] value) {
+/**
+ * Send a single record to the config topic synchronously. Note that 
{@link #claimWritePrivileges()} must be
+ * successfully invoked before calling this method if this store is 
configured to use a fencable writer.
+ * @param key the record key
+ * @param value the record value
+ */
+private void sendPrivileged(String key, byte[] value) throws 
ExecutionException, InterruptedException, TimeoutException {
+sendPrivileged(Collections.singletonList(new ProducerKeyValue(key, 
value)));
+}
+
+/**
+ * Send one or more records to the config topic synchronously. Note that 
{@link #claimWritePrivileges()} must be
+ * successfully invoked before calling this method if this store is 
configured to use a fencable writer.
+ * @param keyValues the list of producer record key/value pairs
+ */
+private void sendPrivileged(List keyValues) throws 
ExecutionException, InterruptedException, TimeoutException {
 if (!usesFencableWriter) {
-configLog.send(key, value);
+List> producerFutures = new ArrayList<>();
+keyValues.forEach(
+keyValue -> 
producerFutures.add(configLog.send(keyValue.key, keyValue.value))
+);
+
+for (Future future : producerFutures) {
+future.get(READ_WRITE_TIMEOUT_MS, TimeUnit.MILLISECONDS);

Review Comment:
   > Considering this is all taking place on the herder's tick thread, we 
should probably care about the difference.
   
   Makes sense.
   
   > We might be able to use the [Timer 
class](https://github.com/apache/kafka/blob/eb7f490159c924ca0f21394d58366c257998f52e/clients/src/main/java/org/apache/kafka/common/utils/Timer.java)
 to simplify some of this logic.
   
   Thanks, that's a great suggestion and it looks like a perfect fit for the 
use case here. One thing I'd like to call out is that the existing 
`putTaskConfigs` implementation had multiple (3) reads to the end of the log 
with each having a 30 second timeout (thus potentially blocking for up to 90 
seconds in total). With the latest pushed changes in this PR, the overall 
timeout across all reads and writes done by `putTaskConfigs` will now be 30 
seconds. While I do believe that this is the right thing to do, I just wanted 
to call it out explicitly to make sure that we're on the same page!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic

2023-01-24 Thread via GitHub


yashmayya commented on code in PR #12984:
URL: https://github.com/apache/kafka/pull/12984#discussion_r1084960389


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##
@@ -712,8 +733,16 @@ KafkaBasedLog 
setupAndCreateKafkaBasedLog(String topic, final Wo
 }
 
 private void sendPrivileged(String key, byte[] value) {
+sendPrivileged(key, value, null);
+}
+
+private void sendPrivileged(String key, byte[] value, Callback 
callback) {
 if (!usesFencableWriter) {
-configLog.send(key, value);

Review Comment:
   Yeah, that's right, the API response will only include the generic top level 
message. 
   
   > I think it'd be nice to include more detail on the cause of the failure
   
   I strongly agree, and this was discussed in some more detail on the [other 
thread](https://github.com/apache/kafka/pull/12984#discussion_r1064077119).
   
   > We wouldn't be making it more vague. The message would state that the 
write to the config topic failed which is the cause for failure. Since the 
exception mapper used by Connect's REST server only writes the [top level 
exception's 
message](https://github.com/apache/kafka/blob/d798ec779c25dba31fa5ee9384d159ed54c6e07b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java#L72)
 to the response (i.e. nested exceptions aren't surfaced via the REST API 
response), I think it makes sense to keep the top level exception's message 
generic and allow users to debug further via the worker logs (where the entire 
exception chain's stack trace will be visible).
   ...
   The reasoning here is that since a Connect user may not even know that 
Connect uses a producer under the hood to write certain requests to the config 
topic for asynchronous processing, it would make more sense to have an 
informative Connect specific exception message rather than directly throwing 
the producer exception which may or may not contain enough details to be 
relevant to a Connect user.
   
   > Another option for the above issue could be changing the exception mapper 
to concatenate all the exception messages from the exception chain.
   
   > Yet another option for this could be to simply append a "Check the worker 
logs for more details on the error" to the top level exception's message in the 
REST API response (the worker logs will have the entire exception chain). 
Thoughts?
   
   What do you think about modifying the exception mapper to be more 
informative (either in this PR or a separate one)?
   



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##
@@ -711,9 +742,9 @@ KafkaBasedLog 
setupAndCreateKafkaBasedLog(String topic, final Wo
 return createKafkaBasedLog(topic, producerProps, consumerProps, new 
ConsumeCallback(), topicDescription, adminSupplier);
 }
 
-private void sendPrivileged(String key, byte[] value) {
+private void sendPrivileged(String key, byte[] value) throws 
ExecutionException, InterruptedException {
 if (!usesFencableWriter) {
-configLog.send(key, value);
+configLog.send(key, value).get();

Review Comment:
   Thanks Chris, both great points. The `get` without timeout here was 
definitely a miss on my part. I've addressed both of your raised concerns in 
the latest patch (including batching multiple sends in a single transaction for 
the EOS enabled case).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic

2023-01-19 Thread GitBox


yashmayya commented on code in PR #12984:
URL: https://github.com/apache/kafka/pull/12984#discussion_r1081145849


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##
@@ -712,8 +733,16 @@ KafkaBasedLog 
setupAndCreateKafkaBasedLog(String topic, final Wo
 }
 
 private void sendPrivileged(String key, byte[] value) {
+sendPrivileged(key, value, null);
+}
+
+private void sendPrivileged(String key, byte[] value, Callback 
callback) {
 if (!usesFencableWriter) {
-configLog.send(key, value);

Review Comment:
   I've gone ahead and made the changes to convert the 
`KafkaConfigBackingStore` APIs to be synchronous even when EOS is disabled 
(thus making the behavior more in-line with the EOS case). This simplifies 
things significantly while also providing consistency w.r.t error handling 
across all the `KafkaConfigBackingStore` APIs without making invasive changes.



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##
@@ -723,7 +752,11 @@ private void sendPrivileged(String key, byte[] value) {
 
 try {
 fencableProducer.beginTransaction();
-fencableProducer.send(new ProducerRecord<>(topic, key, value));
+fencableProducer.send(new ProducerRecord<>(topic, key, value), 
(metadata, exception) -> {

Review Comment:
   I've removed the usage of the producer callback here since we're moving to 
synchronous usage of producer send in the non-EOS case as well anyway (aside 
from the earlier point that it doesn't really make sense to handle errors via 
both a callback as well as `commitTransaction`). The behavior of surfacing 
exceptions synchronously is similar in both cases now; one through calling 
`get()` on the returned future from `Producer::send` and the other through 
`Producer::commitTransaction`.
   
   > Another option for the above issue could be changing the exception mapper 
to concatenate all the exception messages from the exception chain.
   
   Yet another option for this could be to simply append a "Check the worker 
logs for more details on the error" to the top level exception's message in the 
REST API response (the worker logs will have the entire exception chain). 
Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic

2023-01-08 Thread GitBox


yashmayya commented on code in PR #12984:
URL: https://github.com/apache/kafka/pull/12984#discussion_r1064077119


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##
@@ -723,7 +752,11 @@ private void sendPrivileged(String key, byte[] value) {
 
 try {
 fencableProducer.beginTransaction();
-fencableProducer.send(new ProducerRecord<>(topic, key, value));
+fencableProducer.send(new ProducerRecord<>(topic, key, value), 
(metadata, exception) -> {

Review Comment:
   I'm not sure I follow what benefit we'd be getting here by handling both the 
producer callback error as well as the one thrown by `commitTransaction`? The 
control flow would be more straightforward by removing the producer callback 
and just relying on `commitTransaction` to throw exceptions, if any. The 
producer's Javadoc itself also suggests that callbacks need not be defined when 
using the transactional producer since `commitTransaction` will throw the error 
from the last failed send in a transaction.
   
   > making them more vague to compensate
   
   We wouldn't be making it more vague. The message would state that the write 
to the config topic failed which is the cause for failure. Since the exception 
mapper used by Connect's REST server only writes the [top level exception's 
message](https://github.com/apache/kafka/blob/d798ec779c25dba31fa5ee9384d159ed54c6e07b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java#L72)
 to the response (i.e. nested exceptions aren't surfaced via the REST API 
response), I think it makes sense to keep the top level exception's message 
generic and allow users to debug further via the worker logs (where the entire 
exception chain's stack trace will be visible). Note that I'm suggesting a 
similar change for the non-EOS enabled case as well - i.e. don't use the 
producer error directly 
[here](https://github.com/apache/kafka/pull/12984/files#diff-c346b7b90fe30ac08b4211375fe36208139a90e40838dd3a7996021a8c4c5b13R1064),
 instead wrapping it in a `ConnectExc
 eption` which says that the write to the config topic failed. The reasoning 
here is that since a Connect user may not even know that Connect uses a 
producer under the hood to write certain requests to the config topic for 
asynchronous processing, it would make more sense to have an informative 
Connect specific exception message rather than directly throwing the producer 
exception which may or may not contain enough details to be relevant to a 
Connect user.
   
   >  If we're hiding the result from the REST calls, are we not also hiding 
the error from the herder tick thread?
   
   Hm no, the hiding issue was only for non-EOS enabled workers. Like I've 
pointed out above, for workers that have EOS enabled, the REST API does return 
a `500` response.
   
   Edit: Another option for the above issue could be changing the exception 
mapper to concatenate all the exception messages from the exception chain.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic

2023-01-07 Thread GitBox


yashmayya commented on code in PR #12984:
URL: https://github.com/apache/kafka/pull/12984#discussion_r1064077119


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##
@@ -723,7 +752,11 @@ private void sendPrivileged(String key, byte[] value) {
 
 try {
 fencableProducer.beginTransaction();
-fencableProducer.send(new ProducerRecord<>(topic, key, value));
+fencableProducer.send(new ProducerRecord<>(topic, key, value), 
(metadata, exception) -> {

Review Comment:
   I'm not sure I follow what benefit we'd be getting here by handling both the 
producer callback error as well as the one thrown by `commitTransaction`? The 
control flow would be more straightforward by removing the producer callback 
and just relying on `commitTransaction` to throw exceptions, if any. The 
producer's Javadoc itself also suggests that callbacks need not be defined when 
using the transactional producer since `commitTransaction` will throw the error 
from the last failed send in a transaction.
   
   > making them more vague to compensate
   
   We wouldn't be making it more vague. The message would state that the write 
to the config topic failed which is the cause for failure. Since the exception 
mapper used by Connect's REST server only writes the [top level exception's 
message](https://github.com/apache/kafka/blob/d798ec779c25dba31fa5ee9384d159ed54c6e07b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java#L72)
 to the response (i.e. nested exceptions aren't surfaced via the REST API 
response), I think it makes sense to keep the top level exception's message 
generic and allow users to debug further via the worker logs (where the entire 
exception chain's stack trace will be visible). Note that I'm suggesting a 
similar change for the non-EOS enabled case as well - i.e. don't use the 
producer error directly 
[here](https://github.com/apache/kafka/pull/12984/files#diff-c346b7b90fe30ac08b4211375fe36208139a90e40838dd3a7996021a8c4c5b13R1064),
 instead wrapping it in a `ConnectExc
 eption` which says that the write to the config topic failed. The reasoning 
here is that since a Connect user may not even know that Connect uses a 
producer under the hood to write certain requests to the config topic for 
asynchronous processing, it would make more sense to have an informative 
Connect specific exception message rather than directly throwing the producer 
exception which may or may not contain enough details to be relevant to a 
Connect user.
   
   >  If we're hiding the result from the REST calls, are we not also hiding 
the error from the herder tick thread?
   
   Hm no, the hiding issue was only for non-EOS enabled workers. Like I've 
pointed out above, for workers that have EOS enabled, the REST API does return 
a `500` response.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic

2023-01-07 Thread GitBox


yashmayya commented on code in PR #12984:
URL: https://github.com/apache/kafka/pull/12984#discussion_r1064077085


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##
@@ -712,8 +733,16 @@ KafkaBasedLog 
setupAndCreateKafkaBasedLog(String topic, final Wo
 }
 
 private void sendPrivileged(String key, byte[] value) {
+sendPrivileged(key, value, null);
+}
+
+private void sendPrivileged(String key, byte[] value, Callback 
callback) {
 if (!usesFencableWriter) {
-configLog.send(key, value);

Review Comment:
   Oh okay, I see what you're saying now. Sorry, I think I misunderstood you 
earlier. I agree with you, none of the `KafkaConfigBackingStore` use cases seem 
to necessarily require the performance benefit of using async send with 
callbacks. Although to make it synchronous, I think it might be better to avoid 
using the producer callback altogether and instead call `get()` on the returned 
future (which throws any exceptions that were encountered while sending the 
record), WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic

2023-01-03 Thread GitBox


yashmayya commented on code in PR #12984:
URL: https://github.com/apache/kafka/pull/12984#discussion_r1060450083


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##
@@ -723,7 +752,11 @@ private void sendPrivileged(String key, byte[] value) {
 
 try {
 fencableProducer.beginTransaction();
-fencableProducer.send(new ProducerRecord<>(topic, key, value));
+fencableProducer.send(new ProducerRecord<>(topic, key, value), 
(metadata, exception) -> {

Review Comment:
   I believe that `commitTransaction` ensures that all producer records are 
flushed and all pending callbacks invoked before the transaction is committed, 
so this - `where the callback error is propagated before calling the 
commitTransaction and getting the more generic error message.` should already 
be the case with the current changes?
   
   However, it doesn't look like using callbacks with the transactional 
producer offers much benefit - we could simply reword the existing exception 
message (`this may be due to a transient error and the request can be safely 
retried`) to indicate that it could potentially denote a non-transient error as 
well. Furthermore, the specific case that this PR attempted to fix (missing 
WRITE ACL on the config topic not being surfaced to users properly) is anyway 
highly unlikely to go unnoticed in an EOS enabled Connect cluster since the 
herder thread itself will repeatedly hit this condition 
[here](https://github.com/apache/kafka/blob/9d1f9f77642d7e95dec37647657478ed187bdeb7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L419)
 in the tick loop (the worker logs will reveal the underlying 
`TopicAuthorizationException`) and request processing won't happen at all (all 
external requests that are run on the herder's thread will timeout).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic

2023-01-02 Thread GitBox


yashmayya commented on code in PR #12984:
URL: https://github.com/apache/kafka/pull/12984#discussion_r1060149266


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##
@@ -712,8 +733,16 @@ KafkaBasedLog 
setupAndCreateKafkaBasedLog(String topic, final Wo
 }
 
 private void sendPrivileged(String key, byte[] value) {
+sendPrivileged(key, value, null);
+}
+
+private void sendPrivileged(String key, byte[] value, Callback 
callback) {
 if (!usesFencableWriter) {
-configLog.send(key, value);

Review Comment:
   I agree that terminating the producer callbacks in the 
`KafkaConfigBackingStore` itself would look cleaner. However, if we want to 
surface producer errors to users via the REST API responses (which is the goal 
for this PR), we would need to complete the request callbacks either directly 
or via an intermediary callback (also see [this 
comment](https://github.com/apache/kafka/pull/12984#discussion_r1046720576)) in 
the `KafkaConfigBackingStore`  (both of which would involve changing the method 
signatures to accept callbacks) due to the way that the REST API responses are 
generated using `FutureCallback` in `ConnectorsResource`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic

2022-12-27 Thread GitBox


yashmayya commented on code in PR #12984:
URL: https://github.com/apache/kafka/pull/12984#discussion_r1050765139


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##
@@ -712,8 +733,16 @@ KafkaBasedLog 
setupAndCreateKafkaBasedLog(String topic, final Wo
 }
 
 private void sendPrivileged(String key, byte[] value) {
+sendPrivileged(key, value, null);
+}
+
+private void sendPrivileged(String key, byte[] value, Callback 
callback) {
 if (!usesFencableWriter) {
-configLog.send(key, value);

Review Comment:
   Thanks, that's a good observation. Currently, the pause / resume APIs (which 
call `putTargetState`) don't use callback mechanisms whatsoever and they also 
don't have any documented response bodies (they send back a `202` response with 
an empty body right now) and will hence need a bit more refactoring. I feel 
like that should be done in a follow up / separate PR, WDYT?
   
   Edit: Actually, on second thought it's a closely related issue and could be 
taken care of in this PR itself. I'll work on making the requisite changes.
   
   Edit 2: Although now with these changes, there's an inconsistency between 
different APIs since `deleteConnectorConfig`, `putTaskConfigs` and a couple of 
other methods are still using `null` callbacks. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic

2022-12-16 Thread GitBox


yashmayya commented on code in PR #12984:
URL: https://github.com/apache/kafka/pull/12984#discussion_r1050765139


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##
@@ -712,8 +733,16 @@ KafkaBasedLog 
setupAndCreateKafkaBasedLog(String topic, final Wo
 }
 
 private void sendPrivileged(String key, byte[] value) {
+sendPrivileged(key, value, null);
+}
+
+private void sendPrivileged(String key, byte[] value, Callback 
callback) {
 if (!usesFencableWriter) {
-configLog.send(key, value);

Review Comment:
   Thanks, that's a good observation. Currently, the pause / resume APIs (which 
call `putTargetState`) don't use callback mechanisms whatsoever and they also 
don't have any documented response bodies (they send back a `202` response with 
an empty body right now) and will hence need a bit more refactoring. I feel 
like that should be done in a follow up / separate PR, WDYT?
   
   Edit: Actually, on second thought it's a closely related issue and could be 
taken care of in this PR itself. I'll work on making the requisite changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic

2022-12-16 Thread GitBox


yashmayya commented on code in PR #12984:
URL: https://github.com/apache/kafka/pull/12984#discussion_r1050765139


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##
@@ -712,8 +733,16 @@ KafkaBasedLog 
setupAndCreateKafkaBasedLog(String topic, final Wo
 }
 
 private void sendPrivileged(String key, byte[] value) {
+sendPrivileged(key, value, null);
+}
+
+private void sendPrivileged(String key, byte[] value, Callback 
callback) {
 if (!usesFencableWriter) {
-configLog.send(key, value);

Review Comment:
   Thanks, that's a good observation. Currently, the pause / resume APIs (which 
call `putTargetState`) don't use callback mechanisms whatsoever and they also 
don't have any documented response bodies (they send back a `202` response with 
an empty body right now) and will hence need a bit more refactoring. I feel 
like that should be done in a follow up / separate PR, WDYT?



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##
@@ -723,7 +752,11 @@ private void sendPrivileged(String key, byte[] value) {
 
 try {
 fencableProducer.beginTransaction();
-fencableProducer.send(new ProducerRecord<>(topic, key, value));
+fencableProducer.send(new ProducerRecord<>(topic, key, value), 
(metadata, exception) -> {

Review Comment:
   > I believe here we're intentionally relying on this behavior of 
commitTransaction to propagate errors:
   
   
   Thanks for pointing this out!
   
   As per 
https://kafka.apache.org/33/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-
   
   ```
   When used as part of a transaction, it is not necessary to define a callback 
or check the result of the future in order to detect errors from send. If any 
of the send calls failed with an irrecoverable error, the final 
[commitTransaction()](https://kafka.apache.org/33/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#commitTransaction--)
 call will fail and throw the exception from the last failed send. When this 
happens, your application should call 
[abortTransaction()](https://kafka.apache.org/33/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#abortTransaction--)
 to reset the state and continue to send data.
   ```
   
   So what you're saying definitely does make sense, and I tried this scenario 
out manually - for an EOS enabled worker, the response from the `POST 
/connectors` API when the worker's principal doesn't have a `WRITE` ACL on the 
config topic is (without the changes from this PR): 
   
   ```
   {
 "error_code": 500,
 "message": "Failed to write to config topic; this may be due to a 
transient error and the request can be safely retried"
   }
   ```
   
   While the worker logs do have the exact root cause as well (the 
`TopicAuthorizationException`), I believe the REST API response in this case 
isn't all that helpful to the user. With the changes from this PR, the response 
in the same scenario looks like:
   
   ```
   {
 "error_code": 500,
 "message": "Not authorized to access topics: [connect-configs]"
   }
   ```
   
   > I'm not sure if we're in danger of double-completing the callback
   
   I don't believe we are in danger of double completing the callback, although 
you're right in that we are unnecessarily handling the error in two places in a 
way.
   
   We could avoid the use of callbacks in the producer send (although I 
couldn't find anything that explicitly warns against doing so) when using the 
transactional producer and instead refactor 
[this](https://github.com/apache/kafka/blob/e3585a4cd5ddb5b8475a49c38143d18e7c640bfe/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1494)
 / 
[this](https://github.com/apache/kafka/blob/e3585a4cd5ddb5b8475a49c38143d18e7c640bfe/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L731)
 so that the final exception that is thrown is the producer exception itself 
rather than the (doubly) wrapped one. WDYT? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic

2022-12-12 Thread GitBox


yashmayya commented on code in PR #12984:
URL: https://github.com/apache/kafka/pull/12984#discussion_r1046722667


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java:
##
@@ -51,8 +52,9 @@ public interface ConfigBackingStore {
  * Update the configuration for a connector.
  * @param connector name of the connector
  * @param properties the connector configuration
+ * @param callback the callback to be invoked after the put is complete; 
can be {@code null} if no callback is desired
  */
-void putConnectorConfig(String connector, Map properties);
+void putConnectorConfig(String connector, Map properties, 
Callback callback);

Review Comment:
   Could potentially add a new overloaded method in `KafkaConfigBackingStore` 
and avoid touching this interface method although that would require 
refactoring `AbstractHerder` to make it generic in order to allow 
`DistributedHerder` to access an instance of `KafkaConfigBackingStore` rather 
than `ConfigBackingStore`. However, that would be a much more noisy change IMO.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -1058,14 +1058,20 @@ public void putConnectorConfig(final String connName, 
final Map
 }
 
 log.trace("Submitting connector config {} {} {}", 
connName, allowReplace, configState.connectors());
-writeToConfigTopicAsLeader(() -> 
configBackingStore.putConnectorConfig(connName, config));
-
-// Note that we use the updated connector config 
despite the fact that we don't have an updated
-// snapshot yet. The existing task info should 
still be accurate.
-ConnectorInfo info = new ConnectorInfo(connName, 
config, configState.tasks(connName),
-// validateConnectorConfig have checked the 
existence of CONNECTOR_CLASS_CONFIG
-connectorType(config));
-callback.onCompletion(null, new Created<>(!exists, 
info));
+Callback cb = (err, result) -> {

Review Comment:
   Another option could be to directly call `callback.onCompletion` in 
`KafkaConfigBackingStore::putConnectorConfig` itself - i.e. construct the 
`Created` object there using its in-memory maps directly rather 
than the snapshot; this would avoid this double callback kinda mechanism but 
would need some other refactoring.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org