[ https://issues.apache.org/jira/browse/KAFKA-10555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17210606#comment-17210606 ]
Sophie Blee-Goldman commented on KAFKA-10555: --------------------------------------------- > This, it won't be possible to add new thread when in NOT_RUNNING state >following the current proposal of the KIP. Ah, right. Alright I just went back and re-read the final version of KIP-671 and KIP-663 , and here's my summary of the overall current proposal across the relevant scenarios we need to consider: # User initiates client shutdown via KafkaStreams.close() --> current behavior is to transition to NOT_RUNNING (even if an error occurs during shutdown) # User initiates application shutdown via SHUTDOWN_KAFKA_STREAMS_APPLICATION enum --> current proposal is transition to ERROR # User removes the last thread via KafkaStreams.removeStreamThread() --> current proposal is transition to ERROR # User initiates client shutdown via SHUTDOWN_KAFKA_STREAMS_CLIENT enum --> current proposal is to transition to ERROR # Last thread is allowed to die via SHUTDOWN_STREAM_THREAD enum --> current proposal is transition to ERROR # New thread is started via KafkaStreams#addStreamThread --> current proposal is this is only possible in REBALANCING or RUNNING Just to make sure we're all on the same page, note that the current semantics of the ERROR state is technically _not_ a terminal state. We enter ERROR when the last thread dies, at which point _it's the responsibility of the user to shutdown the app_. So, the only transition out of ERROR is not NOT_RUNNING (which is, currently, terminal), but if a user doesn't manually invoke this it 'll just hang out in ERROR forever: which means the cleanup thread, metrics thread, etc continue on. Personally, I did not realize that until recently, and think it's one of the top things to take this opportunity to reconsider. For one thing, if the only thing a user can possibly do once in the ERROR state is call close() and transition out of it, then why not just do it for them? I'd rather not leave the door open to partially-closed zombie Streams applications. For the rest of this doc I'm assuming that we reimagine the ERROR state to run parallel to the NOT_RUNNING state rather than being upstream of it, and an application entering ERROR is always. and automatically shut down. Taking a step back, in the current, pre-KIP-663/671 world, it seems like Streams application operators face a tough choice: what to do if the instance goes into ERROR? If availability is most important you probably just automatically restart it, eg via `new KafkaStreams...start()` or by killing the app to restart the pod, etc. But if the error was truly fatal, or potentially corrupting, restarting it would be anywhere from useless to disastrous (consider repeated overcounting). So if you're a #safetyfirst kind of operator, you'd want to shut everything down and inspect/resolve the error before restarting things. Unfortunately right now there's no way for an operator to know when it's safe to automatically restart, and when manual intervention is merited. So they have to make a choice whether to automatically restart or not, and whichever approach they choose is guaranteed to be wrong some of the time. It seems like the ERROR vs NOT_RUNNING states are actually a natural solution to this problem: we say that NOT_RUNNING is recoverable and safe to restart however you like, including adding new threads. We then reserve the ERROR state for truly fatal errors that should be investigated and resolved before continuing. So ERROR basically means "restart at your own risk". In a post-KIP-663/671 world, users can listen in on the State to determine whether they should automatically restart. This brings up the obvious question: which errors are which? I know I've said a lot without even touching on the actual question at hand, when to transition to what. Some of these cases feel pretty straightforward to me, some not so much. Case #1: The current behavior of KafkaStreams.close() transitioning to NOT_RUNNING feels natural to me Case #2: The only reasonable way to interpret the SHUTDOWN_APPLICATION directive is to transition all clients to ERROR. I can't think of any reason you would want to shut down every instance in the application and then just restart it. If the triggering exception is bad enough to necessitate the nuclear option, well, that's probably not something you can just shrug off and continue. Case #3: Removing the last stream thread should result in NOT_RUNNING, or possibly it could just remain in RUNNING. I am a bit concerned that this might go unnoticed, though. And then we'll get (another) an escalation on Saturday night about consumer lag growing indefinitely. Case #4: This seems like the trickiest scenario to me. I honestly don't think we can make this call for the user. I know we've been tossing around the idea of a separate TO_ERROR and TO_NOT_RUNNING enum, and so far have elected to defer this and pick just one for the initial implementation. But I'm increasingly realizing that, for this case in particular, there is no reasonable way to guess at what the user intended. The fact that they chose this specific option is that the exception was bad enough to require closing the entire client instead of just the thread, but not so bad as to permanently shut down the entire application. If we can't make the call of whether to shut down a thread, client, or application for them, then how can we make the call of which state to end up in in this intermediate case? Case #5: To me, NOT_RUNNING seems like the obvious choice if the last thread dies. Note that this can mean one of two things: the user may have specifically selected the SHUTDOWN_THREAD enum _over_ the SHUTDOWN_CLIENT enum (which should probably have an ERROR option), implying that they probably did not want to permanently shut down to ERROR. But it's also possible they just didn't realize that it was the last thread, or were using the default handler. In both of those cases it should be sufficient to transit to NOT_RUNNING to alert them of this last thread death. This would be recoverable, so users can start up new threads or take whatever action they prefer. Case #6: Given the above, we would allow starting up new threads in REBALANCING, RUNNING, or NOT_RUNNING ...WDYT? > Improve client state machine > ---------------------------- > > Key: KAFKA-10555 > URL: https://issues.apache.org/jira/browse/KAFKA-10555 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Matthias J. Sax > Priority: Major > Labels: needs-kip > > The KafkaStreams client exposes its state to the user for monitoring purpose > (ie, RUNNING, REBALANCING etc). The state of the client depends on the > state(s) of the internal StreamThreads that have their own states. > Furthermore, the client state has impact on what the user can do with the > client. For example, active task can only be queried in RUNNING state and > similar. > With KIP-671 and KIP-663 we improved error handling capabilities and allow to > add/remove stream thread dynamically. We allow adding/removing threads only > in RUNNING and REBALANCING state. This puts us in a "weird" position, because > if we enter ERROR state (ie, if the last thread dies), we cannot add new > threads and longer. However, if we have multiple threads and one dies, we > don't enter ERROR state and do allow to recover the thread. > Before the KIPs the definition of ERROR state was clear, however, with both > KIPs it seem that we should revisit its semantics. -- This message was sent by Atlassian Jira (v8.3.4#803005)