Re: [PR] KAFKA-15517: Improve MirrorMaker logging in case of authorization errors [kafka]

2024-04-16 Thread via GitHub


mimaison merged PR #15558:
URL: https://github.com/apache/kafka/pull/15558


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15517: Improve MirrorMaker logging in case of authorization errors [kafka]

2024-04-10 Thread via GitHub


mimaison commented on code in PR #15558:
URL: https://github.com/apache/kafka/pull/15558#discussion_r1559584216


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java:
##
@@ -320,4 +323,21 @@ static void createCompactedTopic(String topicName, short 
partitions, short repli
 static void createSinglePartitionCompactedTopic(String topicName, short 
replicationFactor, Admin admin) {
 createCompactedTopic(topicName, (short) 1, replicationFactor, admin);
 }
+
+static  T adminCall(Callable callable, Supplier errMsg)
+throws ExecutionException, InterruptedException {
+try {
+return callable.call();
+} catch (ExecutionException | InterruptedException e) {
+Throwable cause = e.getCause();
+if (cause instanceof TopicAuthorizationException ||
+cause instanceof ClusterAuthorizationException ||
+cause instanceof GroupAuthorizationException) {
+log.error("Authorization error occurred while trying to " + 
errMsg.get());

Review Comment:
   Could we do something like:
   ```
   log.error(cause.getClass().getSimpleName() + " occurred while trying to " + 
errMsg.get());
   ```
   so the exact exception is printed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15517: Improve MirrorMaker logging in case of authorization errors [kafka]

2024-03-29 Thread via GitHub


wernerdv commented on code in PR #15558:
URL: https://github.com/apache/kafka/pull/15558#discussion_r1544403086


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##
@@ -216,7 +217,10 @@ Set findConsumerGroups()
 
 Collection listConsumerGroups()
 throws InterruptedException, ExecutionException {
-return sourceAdminClient.listConsumerGroups().valid().get();
+return adminCall(
+() -> sourceAdminClient.listConsumerGroups().valid().get(),
+() -> "list consumer groups on cluster " + 
config.sourceClusterAlias()

Review Comment:
   Yes, it's ready for review now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15517: Improve MirrorMaker logging in case of authorization errors [kafka]

2024-03-29 Thread via GitHub


mimaison commented on code in PR #15558:
URL: https://github.com/apache/kafka/pull/15558#discussion_r1544383751


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##
@@ -216,7 +217,10 @@ Set findConsumerGroups()
 
 Collection listConsumerGroups()
 throws InterruptedException, ExecutionException {
-return sourceAdminClient.listConsumerGroups().valid().get();
+return adminCall(
+() -> sourceAdminClient.listConsumerGroups().valid().get(),
+() -> "list consumer groups on cluster " + 
config.sourceClusterAlias()

Review Comment:
   I don't see the change, did you forget to push?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15517: Improve MirrorMaker logging in case of authorization errors [kafka]

2024-03-29 Thread via GitHub


wernerdv commented on code in PR #15558:
URL: https://github.com/apache/kafka/pull/15558#discussion_r1544364748


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java:
##
@@ -320,4 +322,19 @@ static void createCompactedTopic(String topicName, short 
partitions, short repli
 static void createSinglePartitionCompactedTopic(String topicName, short 
replicationFactor, Admin admin) {
 createCompactedTopic(topicName, (short) 1, replicationFactor, admin);
 }
+
+static  T adminCall(Callable callable, Supplier errMsg)
+throws ExecutionException, InterruptedException {
+try {
+return callable.call();
+} catch (ExecutionException | InterruptedException e) {
+if (e.getCause() instanceof TopicAuthorizationException ||

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15517: Improve MirrorMaker logging in case of authorization errors [kafka]

2024-03-29 Thread via GitHub


wernerdv commented on code in PR #15558:
URL: https://github.com/apache/kafka/pull/15558#discussion_r1544364893


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##
@@ -216,7 +217,10 @@ Set findConsumerGroups()
 
 Collection listConsumerGroups()
 throws InterruptedException, ExecutionException {
-return sourceAdminClient.listConsumerGroups().valid().get();
+return adminCall(
+() -> sourceAdminClient.listConsumerGroups().valid().get(),
+() -> "list consumer groups on cluster " + 
config.sourceClusterAlias()

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15517: Improve MirrorMaker logging in case of authorization errors [kafka]

2024-03-27 Thread via GitHub


mimaison commented on code in PR #15558:
URL: https://github.com/apache/kafka/pull/15558#discussion_r1540866456


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java:
##
@@ -320,4 +322,19 @@ static void createCompactedTopic(String topicName, short 
partitions, short repli
 static void createSinglePartitionCompactedTopic(String topicName, short 
replicationFactor, Admin admin) {
 createCompactedTopic(topicName, (short) 1, replicationFactor, admin);
 }
+
+static  T adminCall(Callable callable, Supplier errMsg)
+throws ExecutionException, InterruptedException {
+try {
+return callable.call();
+} catch (ExecutionException | InterruptedException e) {
+if (e.getCause() instanceof TopicAuthorizationException ||

Review Comment:
   I think we also need to handle `GroupAuthorizationException` as this can be 
thrown by `listConsumerGroupOffsets()`



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##
@@ -216,7 +217,10 @@ Set findConsumerGroups()
 
 Collection listConsumerGroups()
 throws InterruptedException, ExecutionException {
-return sourceAdminClient.listConsumerGroups().valid().get();
+return adminCall(
+() -> sourceAdminClient.listConsumerGroups().valid().get(),
+() -> "list consumer groups on cluster " + 
config.sourceClusterAlias()

Review Comment:
   Should we put the alias before `cluster`? so we get a message like `list 
consumer groups on source cluster` for example. I think it reads better than 
`list consumer groups on cluster source`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15517: Improve MirrorMaker logging in case of authorization errors [kafka]

2024-03-21 Thread via GitHub


wernerdv commented on code in PR #15558:
URL: https://github.com/apache/kafka/pull/15558#discussion_r1534499279


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##
@@ -630,8 +631,16 @@ Map describeTopicConfigs(Set 
topics)
 Set resources = topics.stream()
 .map(x -> new ConfigResource(ConfigResource.Type.TOPIC, x))
 .collect(Collectors.toSet());
-return 
sourceAdminClient.describeConfigs(resources).all().get().entrySet().stream()
-.collect(Collectors.toMap(x -> x.getKey().name(), 
Entry::getValue));
+try {

Review Comment:
   I added the method to MirrorUtils#adminCall.
   Is this what is required or needs improvement?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15517: Improve MirrorMaker logging in case of authorization errors [kafka]

2024-03-21 Thread via GitHub


wernerdv commented on code in PR #15558:
URL: https://github.com/apache/kafka/pull/15558#discussion_r1534494595


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java:
##
@@ -224,6 +229,39 @@ public void testNoBrokerAclAuthorizer() throws Exception {
 verifyNoInteractions(targetAdmin);
 }
 
+@Test
+public void testMissingDescribeConfigsAcl() throws Exception {
+Admin sourceAdmin = mock(Admin.class);
+Admin targetAdmin = mock(Admin.class);
+MirrorSourceConnector connector = new 
MirrorSourceConnector(sourceAdmin, targetAdmin);
+Field configField = connector.getClass().getDeclaredField("config");

Review Comment:
   Added config to the arguments and used the constructor for testing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15517: Improve MirrorMaker logging in case of authorization errors [kafka]

2024-03-20 Thread via GitHub


mimaison commented on code in PR #15558:
URL: https://github.com/apache/kafka/pull/15558#discussion_r1532418821


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##
@@ -630,8 +631,16 @@ Map describeTopicConfigs(Set 
topics)
 Set resources = topics.stream()
 .map(x -> new ConfigResource(ConfigResource.Type.TOPIC, x))
 .collect(Collectors.toSet());
-return 
sourceAdminClient.describeConfigs(resources).all().get().entrySet().stream()
-.collect(Collectors.toMap(x -> x.getKey().name(), 
Entry::getValue));
+try {

Review Comment:
   This only improves this single call, there are a bunch of other admin calls 
runs by the Schedulers that should also be updated. Considering the number of 
calls, it might make sense to have a helper method to wrap the calls and avoid 
too much duplication.



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java:
##
@@ -224,6 +229,39 @@ public void testNoBrokerAclAuthorizer() throws Exception {
 verifyNoInteractions(targetAdmin);
 }
 
+@Test
+public void testMissingDescribeConfigsAcl() throws Exception {
+Admin sourceAdmin = mock(Admin.class);
+Admin targetAdmin = mock(Admin.class);
+MirrorSourceConnector connector = new 
MirrorSourceConnector(sourceAdmin, targetAdmin);
+Field configField = connector.getClass().getDeclaredField("config");

Review Comment:
   Instead of using reflection, I wonder if it would be better to adjust one of 
the existing constructors used for testing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15517: Improve MirrorMaker logging in case of authorization errors [kafka]

2024-03-19 Thread via GitHub


wernerdv commented on PR #15558:
URL: https://github.com/apache/kafka/pull/15558#issuecomment-2007211486

   Hello, @mimaison
   Please, take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15517: Improve MirrorMaker logging in case of authorization errors [kafka]

2024-03-18 Thread via GitHub


wernerdv opened a new pull request, #15558:
URL: https://github.com/apache/kafka/pull/15558

   Log the list of topics for which an authorization error has been received 
when try to describe configs, along with the cluster alias.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org