k-apol commented on code in PR #19913:
URL: https://github.com/apache/kafka/pull/19913#discussion_r2146941926


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -74,11 +76,14 @@ public class InternalTopicManager {
 
     private final Time time;
     private final Admin adminClient;
+    private final boolean isManualInternalTopicConfig;
 
     private final short replicationFactor;
     private final long windowChangeLogAdditionalRetention;
     private final long retryBackOffMs;
     private final long retryTimeoutMs;
+    private Duration initTimeout;
+    private boolean isInitializing = false;

Review Comment:
   The problem I am trying to solve with the **initTimeout** variable (and most 
of the related code) is that the `InternalTopicManager`, where most of the work 
happens, has it's own timeout exception that it can throw. I wanted a way to 
differentiate between the existing case and the case where the user passes a 
timeout into one of the overloads. 
   
   Since the bulk of the work done by the init() method actually happens on the 
`InternalTopicManager`. It made sense to me, if the user wants to call one of 
the timeout overloads for the init constructor, to pass that timeout into the 
`InternalTopicManager.` `makeReady()` has it's own paths where a timeout 
exception can be thrown (the processing takes longer than the configured 
`MAX_POLL_INTERVAL_MS_CONFIG)`, I took this approach to be able to let the 
method throw a timeout exception in both cases. There may be a better way to do 
this, I also thought of doing something similar to Javascript's 
[promise.race](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise/race)
 command. 
   
   The current approach adds a lot of other surrounding code, and does not 
accurately represent the timeout a user would pass anyway. It's not accounting 
for any time that the validation takes before `makeReady` might be called. How 
would you approach it @mjsax ?
   
   
   



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -74,11 +76,14 @@ public class InternalTopicManager {
 
     private final Time time;
     private final Admin adminClient;
+    private final boolean isManualInternalTopicConfig;
 
     private final short replicationFactor;
     private final long windowChangeLogAdditionalRetention;
     private final long retryBackOffMs;
     private final long retryTimeoutMs;
+    private Duration initTimeout;
+    private boolean isInitializing = false;

Review Comment:
   
   The problem I am trying to solve with the **isInitializing** flag is that, 
as I understand from the KIP, we want to prevent makeReady from being invoked 
by anything besides init when configured to do so. My idea was to set this flag 
is set to true when the init method is used to invoke `makeReady`, and 
`makeReady` turns it off at the end of it's work. This way, we can stop topics 
from being created if `makeReady` is called explicitly, instead of via `init` 
when applicable. Currently, it's tied to the timeout setting that also looks 
like it's going to be refactored somehow. There are other ways to stop topics 
from being created conditionally, and this was the simplest approach I could 
think of. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to