Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-05-09 Thread via GitHub


chickenchickenlove commented on PR #15573:
URL: https://github.com/apache/kafka/pull/15573#issuecomment-2103761393

   Thanks for your guideline.
   It was very helpful to me ‍♂️‍♂️‍♂️ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-05-09 Thread via GitHub


mjsax commented on PR #15573:
URL: https://github.com/apache/kafka/pull/15573#issuecomment-2103753531

   Thanks for the PR @chickenchickenlove! Merged to `trunk`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-05-09 Thread via GitHub


mjsax merged PR #15573:
URL: https://github.com/apache/kafka/pull/15573


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-05-02 Thread via GitHub


chickenchickenlove commented on PR #15573:
URL: https://github.com/apache/kafka/pull/15573#issuecomment-2091999016

   Hi @mjsax !
   Sorry, I missed to check `checkStyle` and thanks for your working instead of 
me ‍♂️ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-05-02 Thread via GitHub


mjsax commented on PR #15573:
URL: https://github.com/apache/kafka/pull/15573#issuecomment-2091973130

   I took the liberty to commit a fix to retrigger the build.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-05-02 Thread via GitHub


mjsax commented on code in PR #15573:
URL: https://github.com/apache/kafka/pull/15573#discussion_r1588561925


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java:
##
@@ -110,6 +110,13 @@ public Set topologiesWithMissingInputTopics() {
 .collect(Collectors.toSet());
 }
 
+public Set missingSourceTopics(){

Review Comment:
   ```suggestion
   public Set missingSourceTopics() {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-05-02 Thread via GitHub


mjsax commented on PR #15573:
URL: https://github.com/apache/kafka/pull/15573#issuecomment-2091972588

   There is a checkstyle error:
   ```
   [2024-05-02T11:03:05.489Z] > Task :streams:checkstyleMain
   
   [2024-05-02T11:03:05.489Z] [ant:checkstyle] [ERROR] 
/home/jenkins/workspace/Kafka_kafka-pr_PR-15573/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java:113:45:
 '{' is not preceded with whitespace. [WhitespaceAround]
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-05-02 Thread via GitHub


chickenchickenlove commented on PR #15573:
URL: https://github.com/apache/kafka/pull/15573#issuecomment-2090210638

   @mjsax thanks for your suggestion. it is very suitable  
   I commit your suggestion. when you have free time, please take a look ‍♂️ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-05-01 Thread via GitHub


mjsax commented on code in PR #15573:
URL: https://github.com/apache/kafka/pull/15573#discussion_r1587018485


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java:
##
@@ -19,20 +19,31 @@
 public enum AssignorError {
 // Note: this error code should be reserved for fatal errors, as the 
receiving clients are future-proofed
 // to throw an exception upon an unrecognized error code.
-NONE(0),
-INCOMPLETE_SOURCE_TOPIC_METADATA(1),
-VERSION_PROBING(2), // not actually used anymore, but we may hit it during 
a rolling upgrade from earlier versions
-ASSIGNMENT_ERROR(3),
-SHUTDOWN_REQUESTED(4);
+NONE(0, "NONE", "NONE"),
+INCOMPLETE_SOURCE_TOPIC_METADATA(1, 
"INCOMPLETE_SOURCE_TOPIC_METADATA","Missing source topics are existed. To check 
which topics are missing, please look into the logs of the consumer group 
leader. Only the leaders knows and logs the name of the missing topics."),
+VERSION_PROBING(2, "VERSION_PROBING", "VERSION_PROBING"), // not actually 
used anymore, but we may hit it during a rolling upgrade from earlier versions
+ASSIGNMENT_ERROR(3, "ASSIGNMENT_ERROR", "Hit an unexpected exception 
during task assignment phase of rebalance."),
+SHUTDOWN_REQUESTED(4, "SHUTDOWN_REQUESTED","Encountered fatal error, and 
should send shutdown request for the entire application.");

Review Comment:
   ```suggestion
   SHUTDOWN_REQUESTED(4, "SHUTDOWN_REQUESTED", "A KafkaStreams instance 
encountered a fatal error and requested a shutdown for the entire 
application.");
   ```



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java:
##
@@ -19,20 +19,31 @@
 public enum AssignorError {
 // Note: this error code should be reserved for fatal errors, as the 
receiving clients are future-proofed
 // to throw an exception upon an unrecognized error code.
-NONE(0),
-INCOMPLETE_SOURCE_TOPIC_METADATA(1),
-VERSION_PROBING(2), // not actually used anymore, but we may hit it during 
a rolling upgrade from earlier versions
-ASSIGNMENT_ERROR(3),
-SHUTDOWN_REQUESTED(4);
+NONE(0, "NONE", "NONE"),
+INCOMPLETE_SOURCE_TOPIC_METADATA(1, 
"INCOMPLETE_SOURCE_TOPIC_METADATA","Missing source topics are existed. To check 
which topics are missing, please look into the logs of the consumer group 
leader. Only the leaders knows and logs the name of the missing topics."),
+VERSION_PROBING(2, "VERSION_PROBING", "VERSION_PROBING"), // not actually 
used anymore, but we may hit it during a rolling upgrade from earlier versions
+ASSIGNMENT_ERROR(3, "ASSIGNMENT_ERROR", "Hit an unexpected exception 
during task assignment phase of rebalance."),

Review Comment:
   ```suggestion
   ASSIGNMENT_ERROR(3, "ASSIGNMENT_ERROR", "Internal task assignment error. 
Check the group leader logs for details."),
   ```



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java:
##
@@ -19,20 +19,31 @@
 public enum AssignorError {
 // Note: this error code should be reserved for fatal errors, as the 
receiving clients are future-proofed
 // to throw an exception upon an unrecognized error code.
-NONE(0),
-INCOMPLETE_SOURCE_TOPIC_METADATA(1),
-VERSION_PROBING(2), // not actually used anymore, but we may hit it during 
a rolling upgrade from earlier versions
-ASSIGNMENT_ERROR(3),
-SHUTDOWN_REQUESTED(4);
+NONE(0, "NONE", "NONE"),
+INCOMPLETE_SOURCE_TOPIC_METADATA(1, 
"INCOMPLETE_SOURCE_TOPIC_METADATA","Missing source topics are existed. To check 
which topics are missing, please look into the logs of the consumer group 
leader. Only the leaders knows and logs the name of the missing topics."),

Review Comment:
   ```suggestion
   INCOMPLETE_SOURCE_TOPIC_METADATA(1, "INCOMPLETE_SOURCE_TOPIC_METADATA", 
"Missing metadata for source topics. Check the group leader logs for details."),
   ```



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java:
##
@@ -19,20 +19,31 @@
 public enum AssignorError {
 // Note: this error code should be reserved for fatal errors, as the 
receiving clients are future-proofed
 // to throw an exception upon an unrecognized error code.
-NONE(0),
-INCOMPLETE_SOURCE_TOPIC_METADATA(1),
-VERSION_PROBING(2), // not actually used anymore, but we may hit it during 
a rolling upgrade from earlier versions
-ASSIGNMENT_ERROR(3),
-SHUTDOWN_REQUESTED(4);
+NONE(0, "NONE", "NONE"),
+INCOMPLETE_SOURCE_TOPIC_METADATA(1, 
"INCOMPLETE_SOURCE_TOPIC_METADATA","Missing source topics are existed. To check 
which topics are missing, please look into the logs of the consumer group 
leader. Only the leaders knows and logs the name of the missing topics."),
+VERSION_PROBING(2, "VERSION_PROBING", "VERSION_PROBING"), // not actually 
used anymore, 

Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-04-10 Thread via GitHub


chickenchickenlove commented on PR #15573:
URL: https://github.com/apache/kafka/pull/15573#issuecomment-2046916043

   Gently ping, @mjsax .
   Would you please take a look when you have some free time? ‍♂️ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-03-28 Thread via GitHub


chickenchickenlove commented on code in PR #15573:
URL: https://github.com/apache/kafka/pull/15573#discussion_r1543099933


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -523,7 +523,17 @@ private RepartitionTopics prepareRepartitionTopics(final 
Cluster metadata) {
 final boolean isMissingInputTopics = 
!repartitionTopics.missingSourceTopicExceptions().isEmpty();
 if (isMissingInputTopics) {
 if (!taskManager.topologyMetadata().hasNamedTopologies()) {
-throw new MissingSourceTopicException("Missing source 
topics.");

Review Comment:
   @mjsax Thanks for your kindful descriptions. it was very helpful for me ‍♂️ 
   
   > 1.This current code is not producing a log message:
   I misunderstood, thanks for your time.
   I added `log.error()` through this 
[commit](https://github.com/apache/kafka/pull/15573/commits/fb957dbfc757d62fc5fe4ede4921ec011be96fa2).
   
   
   > I would propose to actually change AssignorError to contain a proper 
String.
   I agree with you. 
   Here's how I've implemented it based on your suggestion. 
   I don't know all the meaning of error codes exactly, so I referred to the 
existing error logs message related with error codes to write the messages. 
   
   What do you think? Do you think i'm on right direction?
   When you have some free time, Please take a look ‍♂️.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-03-28 Thread via GitHub


chickenchickenlove commented on code in PR #15573:
URL: https://github.com/apache/kafka/pull/15573#discussion_r1543099933


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -523,7 +523,17 @@ private RepartitionTopics prepareRepartitionTopics(final 
Cluster metadata) {
 final boolean isMissingInputTopics = 
!repartitionTopics.missingSourceTopicExceptions().isEmpty();
 if (isMissingInputTopics) {
 if (!taskManager.topologyMetadata().hasNamedTopologies()) {
-throw new MissingSourceTopicException("Missing source 
topics.");

Review Comment:
   @mjsax Thanks for your kindful descriptions. it was very helpful for me ‍♂️ 
   
   > 1.This current code is not producing a log message:
   
   I misunderstood, thanks for your time!!
   I added `log.error()` through this 
[commit](https://github.com/apache/kafka/pull/15573/commits/fb957dbfc757d62fc5fe4ede4921ec011be96fa2).
   
   
   > I would propose to actually change AssignorError to contain a proper 
String.
   
   I agree with you. 
   Here's how I've implemented it based on your suggestion. 
([commit](https://github.com/apache/kafka/pull/15573/commits/94b9764efaf54ba2efadb16caccc2e705f553985))
   I don't know all the meaning of `AssignorError codes` exactly, so I referred 
to the existing error logs message related with `AssignorError codes` to write 
the messages. 
   
   What do you think? Do you think i'm on right direction?
   When you have some free time, Please take a look ‍♂️.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-03-28 Thread via GitHub


mjsax commented on code in PR #15573:
URL: https://github.com/apache/kafka/pull/15573#discussion_r1543047190


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -523,7 +523,17 @@ private RepartitionTopics prepareRepartitionTopics(final 
Cluster metadata) {
 final boolean isMissingInputTopics = 
!repartitionTopics.missingSourceTopicExceptions().isEmpty();
 if (isMissingInputTopics) {
 if (!taskManager.topologyMetadata().hasNamedTopologies()) {
-throw new MissingSourceTopicException("Missing source 
topics.");

Review Comment:
   Seems there is some misunderstanding. Sorry for causing confusion.
   
   1) This current code is not producing a log message:
   ```
   final String errorMsg = String.format("Missing source topics. %s", 
repartitionTopics.missingSourceTopics());
   throw new MissingSourceTopicException(errorMsg);
   ```
   The code should be something like:
   ```
   final String errorMsg = String.format("Missing source topics. %s", 
repartitionTopics.missingSourceTopics());
   log.error(errorMsg);
   throw new MissingSourceTopicException(errorMsg);
   ```
   
   2) I did not propose to include the topic names... As Bruno already pointed 
out, it would require a protocol change what seems to be overkill. In 
`StreamsRebalanceListener`, we currently log 
   ```
   Received error code 1
   ```
   Cf 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java#L56
   
   I would propose to actually change `AssignorError` to contain a proper 
String. Error code `1` does not mean anything to users. Additionally, it might 
be good to just change the error message of the log line and the exception to 
say something like: "To check which topics are missing, please look into the 
logs of the consumer group leader. Only the leaders knows and logs the name of 
the missing topics."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-03-28 Thread via GitHub


chickenchickenlove commented on PR #15573:
URL: https://github.com/apache/kafka/pull/15573#issuecomment-2025128974

   > Thanks for the PR. Made a pass.
   
   Aye, i hope so.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-03-28 Thread via GitHub


chickenchickenlove commented on code in PR #15573:
URL: https://github.com/apache/kafka/pull/15573#discussion_r1542822737


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -523,7 +523,17 @@ private RepartitionTopics prepareRepartitionTopics(final 
Cluster metadata) {
 final boolean isMissingInputTopics = 
!repartitionTopics.missingSourceTopicExceptions().isEmpty();
 if (isMissingInputTopics) {
 if (!taskManager.topologyMetadata().hasNamedTopologies()) {
-throw new MissingSourceTopicException("Missing source 
topics.");

Review Comment:
   https://github.com/apache/kafka/assets/90125071/c00ef63e-c20b-4edf-9795-7b8b2b314975;>
   I draw image above to describe `idea for improve`. 
   If you want to read `MissingSourceTopics` in StreamRebalanceListener as 
well, this workaround seems like it could be a way to handle it.
   
   1. As you know, `ThreadLocal` provides storage specific to each thread.
   2. Both `StreamThread`, `StreamRebalanceListener`, `StreamPartitionAssignor` 
are included on internal package. that means, it is not public API.
   3. `StreamRebalanceListener` has reference of `StreamThread` Already. 
   
   
   
https://github.com/apache/kafka/blob/4ccbf1634afb063615616b5995ef279a063fbeab/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L622
   From this,  I believe one `StreamThread` has own `StreamRebalanceListener` 
instance. thsu, `ThreadLocal` is suitable workaround in this case, i believe. 
   
   Thus, all the things that we should do, are 3step.
   1. Add `ThreadLocal` to field of `StreamThread`.
   2. Add method that put `missing source topics` to `ThreadLocal` before throw 
`MissingSourceTopicExceptions`, 
   3. Add some codes on `StreamRebalanceListener` to get `MissingSourceTopics` 
from `ThreadLocal`.
   
   Does it make sense to you? 
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-03-28 Thread via GitHub


chickenchickenlove commented on code in PR #15573:
URL: https://github.com/apache/kafka/pull/15573#discussion_r1542775368


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -523,7 +523,17 @@ private RepartitionTopics prepareRepartitionTopics(final 
Cluster metadata) {
 final boolean isMissingInputTopics = 
!repartitionTopics.missingSourceTopicExceptions().isEmpty();
 if (isMissingInputTopics) {
 if (!taskManager.topologyMetadata().hasNamedTopologies()) {
-throw new MissingSourceTopicException("Missing source 
topics.");

Review Comment:
   > Adding to a previous comment from Bruno, I am wondering if we should also 
change the error log in StreamsRebalanceListener to point out that the missing 
source topic names might be logged on a different instance?
   > 
   > The StreamsRebalanceListener is executed on every instance, but 
StreamsPartitionAssignor only on the group leader.
   
   @mjsax , IMHO, currently with this PR alone, it is not possible to refer to 
`Missing Topics`on `StreamRebalanceListener`. 
   
   To get `Missing topics` on `StreamRebalanceListener` as well, i wrote 
suggestion on this PR. Please refer to `Idea for improving more` on this PR 
description. I think it can be done without modifying public API.
   
   Also, i will create some images for detail. Wait a sec, please. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-03-28 Thread via GitHub


chickenchickenlove commented on code in PR #15573:
URL: https://github.com/apache/kafka/pull/15573#discussion_r1542822737


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -523,7 +523,17 @@ private RepartitionTopics prepareRepartitionTopics(final 
Cluster metadata) {
 final boolean isMissingInputTopics = 
!repartitionTopics.missingSourceTopicExceptions().isEmpty();
 if (isMissingInputTopics) {
 if (!taskManager.topologyMetadata().hasNamedTopologies()) {
-throw new MissingSourceTopicException("Missing source 
topics.");

Review Comment:
   https://github.com/apache/kafka/assets/90125071/c00ef63e-c20b-4edf-9795-7b8b2b314975;>
   I draw image above to describe `idea for improve`. 
   If you want to read `MissingSourceTopics` in StreamRebalanceListener as 
well, this workaround seems like it could be a way to handle it.
   
   1. As you know, `ThreadLocal` provides storage specific to each thread.
   2. Both `StreamThread`, `StreamRebalanceListener`, `StreamPartitionAssignor` 
are included on internal package. that means, it is not public API.
   3. `StreamRebalanceListener` has reference of `StreamThread` Already. 
   
   Thus, all the things that we should do, are 3step.
   1. Add `ThreadLocal` to field of `StreamThread`.
   2. Add method that put `missing source topics` to `ThreadLocal` before throw 
`MissingSourceTopicExceptions`, 
   3. Add some codes on `StreamRebalanceListener` to get `MissingSourceTopics` 
from `ThreadLocal`.
   
   Does it make sense to you? 
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-03-28 Thread via GitHub


chickenchickenlove commented on code in PR #15573:
URL: https://github.com/apache/kafka/pull/15573#discussion_r1542775368


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -523,7 +523,17 @@ private RepartitionTopics prepareRepartitionTopics(final 
Cluster metadata) {
 final boolean isMissingInputTopics = 
!repartitionTopics.missingSourceTopicExceptions().isEmpty();
 if (isMissingInputTopics) {
 if (!taskManager.topologyMetadata().hasNamedTopologies()) {
-throw new MissingSourceTopicException("Missing source 
topics.");

Review Comment:
   > Adding to a previous comment from Bruno, I am wondering if we should also 
change the error log in StreamsRebalanceListener to point out that the missing 
source topic names might be logged on a different instance?
   > 
   > The StreamsRebalanceListener is executed on every instance, but 
StreamsPartitionAssignor only on the group leader.
   
   @mjsax , IMHO, currently with this PR alone, it is not possible to refer to 
`Missing Topics` from on `StreamRebalanceListener`. 
   
   To get `Missing topics` on `StreamRebalanceListener` as well, i wrote 
suggestion on this PR. Please refer to `Idea for improving more` on this PR 
description. 
   
   Also, i will create some images for detail. Wait a sec, please. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-03-28 Thread via GitHub


chickenchickenlove commented on code in PR #15573:
URL: https://github.com/apache/kafka/pull/15573#discussion_r1542775368


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -523,7 +523,17 @@ private RepartitionTopics prepareRepartitionTopics(final 
Cluster metadata) {
 final boolean isMissingInputTopics = 
!repartitionTopics.missingSourceTopicExceptions().isEmpty();
 if (isMissingInputTopics) {
 if (!taskManager.topologyMetadata().hasNamedTopologies()) {
-throw new MissingSourceTopicException("Missing source 
topics.");

Review Comment:
   > Adding to a previous comment from Bruno, I am wondering if we should also 
change the error log in StreamsRebalanceListener to point out that the missing 
source topic names might be logged on a different instance?
   > 
   > The StreamsRebalanceListener is executed on every instance, but 
StreamsPartitionAssignor only on the group leader.
   
   IMHO, currently with this PR alone, it is not possible to refer to `Missing 
Topics` from on `StreamRebalanceListener`. 
   
   To get `Missing topics` on `StreamRebalanceListener` as well, i wrote 
suggestion on this PR. Please refer to `Idea for improving more` on this PR 
description. 
   
   Also, i will create some images for detail. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-03-28 Thread via GitHub


chickenchickenlove commented on code in PR #15573:
URL: https://github.com/apache/kafka/pull/15573#discussion_r1542765807


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -523,7 +523,17 @@ private RepartitionTopics prepareRepartitionTopics(final 
Cluster metadata) {
 final boolean isMissingInputTopics = 
!repartitionTopics.missingSourceTopicExceptions().isEmpty();
 if (isMissingInputTopics) {
 if (!taskManager.topologyMetadata().hasNamedTopologies()) {
-throw new MissingSourceTopicException("Missing source 
topics.");

Review Comment:
   @mjsax thanks for your comments! 
   > Was Bruno meant was, that we need to add log.error(...) to log the error 
message before throwing the exception. Seems you did not add this yet?
   
   No, i created new commits. Please refer to images below ‍♂️ 
   In this Image, Bruno said`In addition to throwing the exception you would 
also log the error.`
   
   https://github.com/apache/kafka/assets/90125071/dc652778-ec5d-4d2b-90d4-950839adf2aa;>
   
   
   and then, I have made these changes to reflect that comment to maintain 
readability.
   you can see this 
[commit](https://github.com/apache/kafka/commit/3f16d28fd07bec76a980b184d7ba708d7d7c9b0c#diff-571894e60232d30f3754d03d122d653e5f2cff4fa646a59ba25688f9aea77318R526-R527).
 
   https://github.com/apache/kafka/assets/90125071/f2551117-a9a0-4b75-8886-063aa0dc6b34;>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-03-28 Thread via GitHub


chickenchickenlove commented on code in PR #15573:
URL: https://github.com/apache/kafka/pull/15573#discussion_r1542765807


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -523,7 +523,17 @@ private RepartitionTopics prepareRepartitionTopics(final 
Cluster metadata) {
 final boolean isMissingInputTopics = 
!repartitionTopics.missingSourceTopicExceptions().isEmpty();
 if (isMissingInputTopics) {
 if (!taskManager.topologyMetadata().hasNamedTopologies()) {
-throw new MissingSourceTopicException("Missing source 
topics.");

Review Comment:
   @mjsax thanks for your comments! 
   > Was Bruno meant was, that we need to add log.error(...) to log the error 
message before throwing the exception. Seems you did not add this yet?
   
   No, i created new commits. Please refer to images below ‍♂️ 
   In this Image, Bruno say `In addition to throwing the exception you would 
also log the error.`
   https://github.com/apache/kafka/assets/90125071/dc652778-ec5d-4d2b-90d4-950839adf2aa;>
   
   and then, I have made these changes to reflect that comment to maintain 
readability.
   you can see this 
[commit](https://github.com/apache/kafka/commit/3f16d28fd07bec76a980b184d7ba708d7d7c9b0c#diff-571894e60232d30f3754d03d122d653e5f2cff4fa646a59ba25688f9aea77318R526-R527).
 
   https://github.com/apache/kafka/assets/90125071/f2551117-a9a0-4b75-8886-063aa0dc6b34;>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-03-28 Thread via GitHub


chickenchickenlove commented on code in PR #15573:
URL: https://github.com/apache/kafka/pull/15573#discussion_r1542765807


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -523,7 +523,17 @@ private RepartitionTopics prepareRepartitionTopics(final 
Cluster metadata) {
 final boolean isMissingInputTopics = 
!repartitionTopics.missingSourceTopicExceptions().isEmpty();
 if (isMissingInputTopics) {
 if (!taskManager.topologyMetadata().hasNamedTopologies()) {
-throw new MissingSourceTopicException("Missing source 
topics.");

Review Comment:
   @mjsax thanks for your comments! 
   > Was Bruno meant was, that we need to add log.error(...) to log the error 
message before throwing the exception. Seems you did not add this yet?
   
   No, i created new commits. you can see image below.
   In this Image, Bruno say `In addition to throwing the exception you would 
also log the error.`
   https://github.com/apache/kafka/assets/90125071/dc652778-ec5d-4d2b-90d4-950839adf2aa;>
   
   and then, I have made these changes to reflect that comment to maintain 
readability.
   you can see this 
[commit](https://github.com/apache/kafka/commit/3f16d28fd07bec76a980b184d7ba708d7d7c9b0c#diff-571894e60232d30f3754d03d122d653e5f2cff4fa646a59ba25688f9aea77318R526-R527).
 
   https://github.com/apache/kafka/assets/90125071/f2551117-a9a0-4b75-8886-063aa0dc6b34;>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-03-28 Thread via GitHub


mjsax commented on code in PR #15573:
URL: https://github.com/apache/kafka/pull/15573#discussion_r1542738987


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -523,7 +523,17 @@ private RepartitionTopics prepareRepartitionTopics(final 
Cluster metadata) {
 final boolean isMissingInputTopics = 
!repartitionTopics.missingSourceTopicExceptions().isEmpty();
 if (isMissingInputTopics) {
 if (!taskManager.topologyMetadata().hasNamedTopologies()) {
-throw new MissingSourceTopicException("Missing source 
topics.");

Review Comment:
   Was Bruno meant was, that we need to add `log.error(...)` to log the error 
message before throwing the exception. Seems you did not add this yet?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -523,7 +523,17 @@ private RepartitionTopics prepareRepartitionTopics(final 
Cluster metadata) {
 final boolean isMissingInputTopics = 
!repartitionTopics.missingSourceTopicExceptions().isEmpty();
 if (isMissingInputTopics) {
 if (!taskManager.topologyMetadata().hasNamedTopologies()) {
-throw new MissingSourceTopicException("Missing source 
topics.");

Review Comment:
   Adding to a previous comment from Bruno, I am wondering if we should also 
change the error log in `StreamsRebalanceListener` to point out that the 
missing source topic names might be logged on a different instance?
   
   The `StreamsRebalanceListener` is executed on every instance, but 
`StreamsPartitionAssignor` only on the group leader.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-03-26 Thread via GitHub


chickenchickenlove commented on PR #15573:
URL: https://github.com/apache/kafka/pull/15573#issuecomment-2020225193

   Gently ping, @cadonna !
   I make new commit to apply your comments.
   When you have some free time, please take a look. ‍♂️ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-03-26 Thread via GitHub


chickenchickenlove commented on code in PR #15573:
URL: https://github.com/apache/kafka/pull/15573#discussion_r1539079968


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java:
##
@@ -119,7 +119,8 @@ public Queue 
missingSourceTopicExceptions() {
 return new StreamsException(
 new MissingSourceTopicException(String.format(
 "Missing source topics %s for subtopology %d of topology 
%s",
-missingSourceTopics, subtopologyId, topologyName)),
+missingSourceTopics, subtopologyId, topologyName),
+missingSourceTopics),

Review Comment:
   I really appreciate your kind explanation.  
   I've understood it clearly, and thanks to you, I've established the correct 
direction for revision.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-03-26 Thread via GitHub


chickenchickenlove commented on code in PR #15573:
URL: https://github.com/apache/kafka/pull/15573#discussion_r1539075629


##
streams/src/main/java/org/apache/kafka/streams/errors/MissingSourceTopicException.java:
##
@@ -16,11 +16,25 @@
  */
 package org.apache.kafka.streams.errors;
 
+import java.util.HashSet;
+import java.util.Set;
+
 public class MissingSourceTopicException extends StreamsException {
 
 private final static long serialVersionUID = 1L;
+private final Set missingTopics;
 
 public MissingSourceTopicException(final String message) {
 super(message);
+this.missingTopics = new HashSet<>();
+}
+
+public MissingSourceTopicException(final String message, final Set 
missingTopics) {
+super(message);
+this.missingTopics = missingTopics;
+}
+
+public Set getMissingTopics() {
+return this.missingTopics;

Review Comment:
   @cadonna thanks for your comment, really. 
   your comment encourages me really ‍♂️ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-03-26 Thread via GitHub


cadonna commented on code in PR #15573:
URL: https://github.com/apache/kafka/pull/15573#discussion_r1538822089


##
streams/src/main/java/org/apache/kafka/streams/errors/MissingSourceTopicException.java:
##
@@ -16,11 +16,25 @@
  */
 package org.apache.kafka.streams.errors;
 
+import java.util.HashSet;
+import java.util.Set;
+
 public class MissingSourceTopicException extends StreamsException {
 
 private final static long serialVersionUID = 1L;
+private final Set missingTopics;
 
 public MissingSourceTopicException(final String message) {
 super(message);
+this.missingTopics = new HashSet<>();
+}
+
+public MissingSourceTopicException(final String message, final Set 
missingTopics) {
+super(message);
+this.missingTopics = missingTopics;
+}
+
+public Set getMissingTopics() {
+return this.missingTopics;

Review Comment:
   Don't worry! We are also here to guide new contributors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-03-26 Thread via GitHub


cadonna commented on code in PR #15573:
URL: https://github.com/apache/kafka/pull/15573#discussion_r1538819784


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java:
##
@@ -119,7 +119,8 @@ public Queue 
missingSourceTopicExceptions() {
 return new StreamsException(
 new MissingSourceTopicException(String.format(
 "Missing source topics %s for subtopology %d of topology 
%s",
-missingSourceTopics, subtopologyId, topologyName)),
+missingSourceTopics, subtopologyId, topologyName),
+missingSourceTopics),

Review Comment:
   The public interface is defined as everything that shows up in the 
[javadocs](https://kafka.apache.org/37/javadoc/). Classes in a package whose 
name contains `internals` do not show up in the javadocs. Class 
`RepartitionTopics` is in package 
`org.apache.kafka.streams.processor.internals`. Thus, `RepartitionTopics` is 
not part of the public interface.
   Regarding field `missingTopics`, there is already 
`missingInputTopicsBySubtopology` that includes all missing topics. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-03-26 Thread via GitHub


cadonna commented on code in PR #15573:
URL: https://github.com/apache/kafka/pull/15573#discussion_r1538819784


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java:
##
@@ -119,7 +119,8 @@ public Queue 
missingSourceTopicExceptions() {
 return new StreamsException(
 new MissingSourceTopicException(String.format(
 "Missing source topics %s for subtopology %d of topology 
%s",
-missingSourceTopics, subtopologyId, topologyName)),
+missingSourceTopics, subtopologyId, topologyName),
+missingSourceTopics),

Review Comment:
   The public interface is defined as everything that shows up in the 
[javadocs](https://kafka.apache.org/37/javadoc/). Classes in a package whose 
name contains `internals` does not show up in the javadocs. Class 
`RepartitionTopics` is in package 
`org.apache.kafka.streams.processor.internals`. Thus, `RepartitionTopics` is 
not part of the public interface.
   Regarding field `missingTopics`, there is already 
`missingInputTopicsBySubtopology` that includes all missing topics. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-03-25 Thread via GitHub


chickenchickenlove commented on code in PR #15573:
URL: https://github.com/apache/kafka/pull/15573#discussion_r1537690828


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java:
##
@@ -119,7 +119,8 @@ public Queue 
missingSourceTopicExceptions() {
 return new StreamsException(
 new MissingSourceTopicException(String.format(
 "Missing source topics %s for subtopology %d of topology 
%s",
-missingSourceTopics, subtopologyId, topologyName)),
+missingSourceTopics, subtopologyId, topologyName),
+missingSourceTopics),

Review Comment:
   FYI, i write skeleton code below! 
   Does it make sense to you as well?
   ```java
   public class RepartitionTopics {
   ...
   // Add new field (private)
   private final Set missingTopics = new HashSet(); 
   
   ...
   public Set topologiesWithMissingInputTopics() { ... }
   public Queue missingSourceTopicExceptions() { ... }
   
   // Add new method (package-private)
   Set getMissingTopics() {
 return this.missingTopics;
   }
  ...
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-03-25 Thread via GitHub


chickenchickenlove commented on code in PR #15573:
URL: https://github.com/apache/kafka/pull/15573#discussion_r1537690828


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java:
##
@@ -119,7 +119,8 @@ public Queue 
missingSourceTopicExceptions() {
 return new StreamsException(
 new MissingSourceTopicException(String.format(
 "Missing source topics %s for subtopology %d of topology 
%s",
-missingSourceTopics, subtopologyId, topologyName)),
+missingSourceTopics, subtopologyId, topologyName),
+missingSourceTopics),

Review Comment:
   FYI, i write skeleton code below! 
   Does it make sense to you as well?
   ```java
   public class RepartitionTopics {
   ...
   // Add new field (private)
   private final Set missingTopics = new HashSet(); 
   
   ...
   public Queue missingSourceTopicExceptions() {...}
   
   // Add new method (package-private)
   Set getMissingTopics() {
 return this.missingTopics;
   }
  ...
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-03-25 Thread via GitHub


chickenchickenlove commented on code in PR #15573:
URL: https://github.com/apache/kafka/pull/15573#discussion_r1537649345


##
streams/src/main/java/org/apache/kafka/streams/errors/MissingSourceTopicException.java:
##
@@ -16,11 +16,25 @@
  */
 package org.apache.kafka.streams.errors;
 
+import java.util.HashSet;
+import java.util.Set;
+
 public class MissingSourceTopicException extends StreamsException {
 
 private final static long serialVersionUID = 1L;
+private final Set missingTopics;
 
 public MissingSourceTopicException(final String message) {
 super(message);
+this.missingTopics = new HashSet<>();
+}
+
+public MissingSourceTopicException(final String message, final Set 
missingTopics) {
+super(message);
+this.missingTopics = missingTopics;
+}
+
+public Set getMissingTopics() {
+return this.missingTopics;

Review Comment:
   Hi, @cadonna. Thanks for your comment and your time. ‍♂️ 
   It is very helpful for me, and i feel quite sorry to make you spend a lot of 
time for me.
   
   After reading the KIP document, now i can tell public interface should be 
introduced carefully! 
   I have a comment to make new commit to apply your comment. 
   
   When you have some free time, could you take a look? it will be very helpful 
for me. ‍♂️ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-03-25 Thread via GitHub


chickenchickenlove commented on code in PR #15573:
URL: https://github.com/apache/kafka/pull/15573#discussion_r1537642457


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java:
##
@@ -119,7 +119,8 @@ public Queue 
missingSourceTopicExceptions() {
 return new StreamsException(
 new MissingSourceTopicException(String.format(
 "Missing source topics %s for subtopology %d of topology 
%s",
-missingSourceTopics, subtopologyId, topologyName)),
+missingSourceTopics, subtopologyId, topologyName),
+missingSourceTopics),

Review Comment:
   @cadonna, I have a question. ✋ 
   May i use the `package-private access modifier` for this? 
   I think that `package-private access modifier` seems not be included to 
`public interface`, right? 
(https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals)
   
   If so, i think that solution will be quite simple. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-03-25 Thread via GitHub


cadonna commented on code in PR #15573:
URL: https://github.com/apache/kafka/pull/15573#discussion_r1537338065


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java:
##
@@ -119,7 +119,8 @@ public Queue 
missingSourceTopicExceptions() {
 return new StreamsException(
 new MissingSourceTopicException(String.format(
 "Missing source topics %s for subtopology %d of topology 
%s",
-missingSourceTopics, subtopologyId, topologyName)),
+missingSourceTopics, subtopologyId, topologyName),
+missingSourceTopics),

Review Comment:
   Would it be possible to return a set of missing source topics from 
`RepartitionTopics`?



##
streams/src/main/java/org/apache/kafka/streams/errors/MissingSourceTopicException.java:
##
@@ -16,11 +16,25 @@
  */
 package org.apache.kafka.streams.errors;
 
+import java.util.HashSet;
+import java.util.Set;
+
 public class MissingSourceTopicException extends StreamsException {
 
 private final static long serialVersionUID = 1L;
+private final Set missingTopics;
 
 public MissingSourceTopicException(final String message) {
 super(message);
+this.missingTopics = new HashSet<>();
+}
+
+public MissingSourceTopicException(final String message, final Set 
missingTopics) {
+super(message);
+this.missingTopics = missingTopics;
+}
+
+public Set getMissingTopics() {
+return this.missingTopics;

Review Comment:
   This class is part of the public API. That means, we cannot change it 
without a Kafka Improvement Proposal (KIP)[1]. 
   I am not sure if adding the missing source topics to the exception makes too 
much sense, because the exception is caught in the `StreamsPartitionAssignor` 
[2] and transformed to a group assignment error 
(`INCOMPLETE_SOURCE_TOPIC_METADATA`). The missing source topics are not 
propagated to the point where the actual `MissingSourceTopicException` is 
thrown to the users, which is in the StreamsRebalanceListener [3]. To achieve 
this, a protocol change would be needed which I think it is not worth.
   What we could do instead is log an error message with the missing source 
topics in `RepartitionsTopics` or `StreamsPartitionAssignor`. I slightly prefer 
the latter.
   
   [1] 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
   [2] 
https://github.com/apache/kafka/blob/d8dd068a626dcab538c2b234ffd8799a94b2f0ed/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java#L449
   [3] 
https://github.com/apache/kafka/blob/4fe4cdc4a61cbac8e070a8b5514403235194015b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java#L58



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -523,7 +523,17 @@ private RepartitionTopics prepareRepartitionTopics(final 
Cluster metadata) {
 final boolean isMissingInputTopics = 
!repartitionTopics.missingSourceTopicExceptions().isEmpty();

Review Comment:
   Here you could then use the method on `RepartitionTopcs` I proposed in my 
other comment. 



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -523,7 +523,17 @@ private RepartitionTopics prepareRepartitionTopics(final 
Cluster metadata) {
 final boolean isMissingInputTopics = 
!repartitionTopics.missingSourceTopicExceptions().isEmpty();
 if (isMissingInputTopics) {
 if (!taskManager.topologyMetadata().hasNamedTopologies()) {
-throw new MissingSourceTopicException("Missing source 
topics.");

Review Comment:
   In addition to throwing the exception you would also log the error. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-03-22 Thread via GitHub


chickenchickenlove closed pull request #15573: KAFKA-15951: 
MissingSourceTopicException should include topic names
URL: https://github.com/apache/kafka/pull/15573


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15951: MissingSourceTopicException should include topic names [kafka]

2024-03-21 Thread via GitHub


chickenchickenlove opened a new pull request, #15573:
URL: https://github.com/apache/kafka/pull/15573

   This is minor changes!
   
   - Jira : https://issues.apache.org/jira/browse/KAFKA-15951
   - `MissingSourceTopicException` has field `missingTopics` to store missing 
topics. 
   - The `StreamsPartitionAssignor` throws a `MissingSourceTopicException`. and 
it depends on result of 
`repartitionTopics.missingSourceTopicExceptions().isEmpty()`. thus, 
`missingSourceTopicExceptions()` must always contain an iterable 
`MissingSourceTopicException`, allowing for the aggregation and throwing of 
`MissingSourceTopics`.
   - Idea for improving more
 -  `StreamsRebalanceListener` can throw `MissingSourceTopicException` as 
well. however, `StreamsRebalanceListener` cannot get missing topic List because 
`StreamsRebalanceListener` depends on ErrorCode 
`AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()`
 - If `StreamsRebalanceListener` should include missing topic list when it 
throws `MissingSourceTopicException` as well, we can consider `ThreadLocal` to 
reach the target. 
 - Skeleton
   -  Set `ThreadLocal` to `StreamThread` as member field.
   -  Put missing topics to `ThreadLocal`. 
   - `StreamsRebalanceListener` can get missing topics through 
`ThreadLocal`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org