[ 
https://issues.apache.org/jira/browse/KAFKA-10555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17210606#comment-17210606
 ] 

Sophie Blee-Goldman commented on KAFKA-10555:
---------------------------------------------

> This, it won't be possible to add new thread when in NOT_RUNNING state 
>following the current proposal of the KIP.

Ah, right. Alright I just went back and re-read the final version of KIP-671 
and KIP-663 , and here's my summary of the overall current proposal across the 
relevant scenarios we need to consider:
 # User initiates client shutdown via KafkaStreams.close() --> current behavior 
is to transition to NOT_RUNNING (even if an error occurs during shutdown)
 # User initiates application shutdown via SHUTDOWN_KAFKA_STREAMS_APPLICATION 
enum -->  current proposal is transition to ERROR
 # User removes the last thread via KafkaStreams.removeStreamThread() --> 
current proposal is transition to ERROR
 # User initiates client shutdown via SHUTDOWN_KAFKA_STREAMS_CLIENT enum --> 
current proposal is to transition to ERROR
 # Last thread is allowed to die via SHUTDOWN_STREAM_THREAD enum --> current 
proposal is transition to ERROR
 # New thread is started via KafkaStreams#addStreamThread --> current proposal 
is this is only possible in REBALANCING or RUNNING

Just to make sure we're all on the same page, note that the current semantics 
of the ERROR state is technically _not_ a terminal state. We enter ERROR when 
the last thread dies, at which point _it's the responsibility of the user to 
shutdown the app_. So, the only transition out of ERROR is not NOT_RUNNING 
(which is, currently, terminal), but if a user doesn't manually invoke this it 
'll just hang out in ERROR forever: which means the cleanup thread, metrics 
thread, etc continue on.

Personally, I did not realize that until recently, and think it's one of the 
top things to take this opportunity to reconsider. For one thing, if the only 
thing a user can possibly do once in the ERROR state is call close() and 
transition out of it, then why not just do it for them? I'd rather not leave 
the door open to partially-closed zombie Streams applications. For the rest of 
this doc I'm assuming that we reimagine the ERROR state to run parallel to the 
NOT_RUNNING state rather than being upstream of it, and an application entering 
ERROR is always. and automatically shut down.

 

Taking a step back, in the current, pre-KIP-663/671 world, it seems like 
Streams application operators face a tough choice: what to do if the instance 
goes into ERROR? If availability is most important you probably just 
automatically restart it, eg via `new KafkaStreams...start()` or by killing the 
app to restart the pod, etc. But if the error was truly fatal, or potentially 
corrupting, restarting it would be anywhere from useless to disastrous 
(consider repeated overcounting). So if you're a #safetyfirst kind of operator, 
you'd want to shut everything down and inspect/resolve the error before 
restarting things.

Unfortunately right now there's no way for an operator to know when it's safe 
to automatically restart, and when manual intervention is merited. So they have 
to make a choice whether to automatically restart or not, and whichever 
approach they choose is guaranteed to be wrong some of the time.

It seems like the ERROR vs NOT_RUNNING states are actually a natural solution 
to this problem: we say that NOT_RUNNING is recoverable and safe to restart 
however you like, including adding new threads. We then reserve the ERROR state 
for truly fatal errors that should be investigated and resolved before 
continuing. So ERROR basically means "restart at your own risk". In a 
post-KIP-663/671 world, users can listen in on the State to determine whether 
they should automatically restart.

This brings up the obvious question: which errors are which? I know I've said a 
lot without even touching on the actual question at hand, when to transition to 
what. Some of these cases feel pretty straightforward to me, some not so much.

Case #1: The current behavior of KafkaStreams.close() transitioning to 
NOT_RUNNING feels natural to me

Case #2: The only reasonable way to interpret the SHUTDOWN_APPLICATION 
directive is to transition all clients to ERROR. I can't think of any reason 
you would want to shut down every instance in the application and then just 
restart it. If the triggering exception is bad enough to necessitate the 
nuclear option, well, that's probably not something you can just shrug off and 
continue.

Case #3: Removing the last stream thread should result in NOT_RUNNING, or 
possibly it could just remain in RUNNING. I am a bit concerned that this might 
go unnoticed, though. And then we'll get (another) an escalation on Saturday 
night about consumer lag growing indefinitely. 

Case #4: This seems like the trickiest scenario to me. I honestly don't think 
we can make this call for the user. I know we've been tossing around the idea 
of a separate TO_ERROR and TO_NOT_RUNNING enum, and so far have elected to 
defer this and pick just one for the initial implementation. But I'm 
increasingly realizing that, for this case in particular, there is no 
reasonable way to guess at what the user intended. The fact that they chose 
this specific option is that the exception was bad enough to require closing 
the entire client instead of just the thread, but not so bad as to permanently 
shut down the entire application. If we can't make the call of whether to shut 
down a thread, client, or application for them, then how can we make the call 
of which state to end up in in this intermediate case?

Case #5: To me, NOT_RUNNING seems like the obvious choice if the last thread 
dies. Note that this can mean one of two things: the user may have specifically 
selected the SHUTDOWN_THREAD enum _over_ the SHUTDOWN_CLIENT enum (which should 
probably have an ERROR option), implying that they probably did not want to 
permanently shut down to ERROR. But it's also possible they just didn't realize 
that it was the last thread, or were using the default handler. In both of 
those cases it should be sufficient to transit to NOT_RUNNING to alert them of 
this last thread death. This would be recoverable, so users can start up new 
threads or take whatever action they prefer.

Case #6: Given the above, we would allow starting up new threads in 
REBALANCING, RUNNING, or NOT_RUNNING

...WDYT?

 

> Improve client state machine
> ----------------------------
>
>                 Key: KAFKA-10555
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10555
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Priority: Major
>              Labels: needs-kip
>
> The KafkaStreams client exposes its state to the user for monitoring purpose 
> (ie, RUNNING, REBALANCING etc). The state of the client depends on the 
> state(s) of the internal StreamThreads that have their own states.
> Furthermore, the client state has impact on what the user can do with the 
> client. For example, active task can only be queried in RUNNING state and 
> similar.
> With KIP-671 and KIP-663 we improved error handling capabilities and allow to 
> add/remove stream thread dynamically. We allow adding/removing threads only 
> in RUNNING and REBALANCING state. This puts us in a "weird" position, because 
> if we enter ERROR state (ie, if the last thread dies), we cannot add new 
> threads and longer. However, if we have multiple threads and one dies, we 
> don't enter ERROR state and do allow to recover the thread.
> Before the KIPs the definition of ERROR state was clear, however, with both 
> KIPs it seem that we should revisit its semantics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to